/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.metadata.etcd.helpers;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.kv.GetResponse;
import com.coreos.jetcd.options.GetOption;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.metadata.etcd.EtcdWatchClient;
import org.apache.bookkeeper.metadata.etcd.EtcdWatcher;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeySetReader<T>
implements BiConsumer<WatchResponse, Throwable>,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(KeySetReader.class);
    private final Client client;
    private final boolean ownWatchClient;
    private final EtcdWatchClient watchClient;
    private final Function<ByteSequence, T> encoder;
    private final ByteSequence beginKey;
    private final ByteSequence endKey;
    private final Set<ByteSequence> keys;
    private final CopyOnWriteArraySet<Consumer<Versioned<Set<T>>>> consumers = new CopyOnWriteArraySet();
    private volatile long revision = -1L;
    private CompletableFuture<EtcdWatcher> watchFuture = null;
    private CompletableFuture<Void> closeFuture = null;

    public KeySetReader(Client client, Function<ByteSequence, T> encoder, ByteSequence beginKey, ByteSequence endKey) {
        this(client, new EtcdWatchClient(client), encoder, beginKey, endKey);
    }

    public KeySetReader(Client client, EtcdWatchClient watchClient, Function<ByteSequence, T> encoder, ByteSequence beginKey, ByteSequence endKey) {
        this.client = client;
        this.watchClient = watchClient;
        this.ownWatchClient = false;
        this.encoder = encoder;
        this.beginKey = beginKey;
        this.endKey = endKey;
        this.keys = Collections.synchronizedSet(Sets.newHashSet());
    }

    public CompletableFuture<Versioned<Set<T>>> read() {
        GetOption.Builder optionBuilder = GetOption.newBuilder().withKeysOnly(true);
        if (null != this.endKey) {
            optionBuilder.withRange(this.endKey);
        }
        return this.client.getKVClient().get(this.beginKey, optionBuilder.build()).thenApply(getResp -> {
            boolean updated = this.updateLocalValue((GetResponse)getResp);
            Versioned<Set<T>> localValue = this.getLocalValue();
            try {
                Versioned<Set<T>> versioned = localValue;
                return versioned;
            }
            finally {
                if (updated) {
                    this.notifyConsumers(localValue);
                }
            }
        });
    }

    @VisibleForTesting
    long getRevision() {
        return this.revision;
    }

    private void notifyConsumers(Versioned<Set<T>> localValue) {
        this.consumers.forEach((Consumer<Consumer<Versioned<Set<T>>>>)((Consumer<Consumer>)consumer -> consumer.accept(localValue)));
    }

    private synchronized boolean updateLocalValue(GetResponse response) {
        if (this.revision < response.getHeader().getRevision()) {
            this.revision = response.getHeader().getRevision();
            this.keys.clear();
            for (KeyValue kv : response.getKvs()) {
                ByteSequence key = kv.getKey();
                this.keys.add(key);
            }
            return true;
        }
        return false;
    }

    private synchronized Versioned<Set<T>> processWatchResponse(WatchResponse response) {
        if (null != this.closeFuture) {
            return null;
        }
        if (log.isDebugEnabled()) {
            log.debug("Received watch response : revision = {}, {} events = {}", new Object[]{response.getHeader().getRevision(), response.getEvents().size(), response.getEvents()});
        }
        if (response.getHeader().getRevision() <= this.revision) {
            return null;
        }
        this.revision = response.getHeader().getRevision();
        response.getEvents().forEach(event -> {
            switch (event.getEventType()) {
                case PUT: {
                    this.keys.add(event.getKeyValue().getKey());
                    break;
                }
                case DELETE: {
                    this.keys.remove(event.getKeyValue().getKey());
                    break;
                }
            }
        });
        return this.getLocalValue();
    }

    @VisibleForTesting
    synchronized Versioned<Set<T>> getLocalValue() {
        return new Versioned(this.keys.stream().map(this.encoder).collect(Collectors.toSet()), (Version)new LongVersion(this.revision));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Versioned<Set<T>>> getOrRead() {
        boolean shouldRead = false;
        KeySetReader keySetReader = this;
        synchronized (keySetReader) {
            if (this.revision < 0L) {
                shouldRead = true;
            }
        }
        if (shouldRead) {
            return this.read();
        }
        return FutureUtils.value(this.getLocalValue());
    }

    @VisibleForTesting
    synchronized boolean isWatcherSet() {
        return null != this.watchFuture;
    }

    public CompletableFuture<Versioned<Set<T>>> readAndWatch(Consumer<Versioned<Set<T>>> consumer) {
        if (!this.consumers.add(consumer) || this.isWatcherSet()) {
            return this.getOrRead();
        }
        return this.getOrRead().thenCompose(versionedKeys -> {
            long revision = ((LongVersion)versionedKeys.getVersion()).getLongVersion();
            return this.watch(revision).thenApply(ignored -> versionedKeys);
        });
    }

    public CompletableFuture<Void> unwatch(Consumer<Versioned<Set<T>>> consumer) {
        if (this.consumers.remove(consumer) && this.consumers.isEmpty()) {
            return this.closeOrRewatch(false);
        }
        return FutureUtils.Void();
    }

    private synchronized CompletableFuture<EtcdWatcher> watch(long revision) {
        if (null != this.watchFuture) {
            return this.watchFuture;
        }
        WatchOption.Builder optionBuilder = WatchOption.newBuilder().withRevision(revision);
        if (null != this.endKey) {
            optionBuilder.withRange(this.endKey);
        }
        this.watchFuture = this.watchClient.watch(this.beginKey, optionBuilder.build(), this);
        return this.watchFuture.whenComplete((watcher, cause) -> {
            if (null != cause) {
                KeySetReader keySetReader = this;
                synchronized (keySetReader) {
                    this.watchFuture = null;
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> closeOrRewatch(boolean rewatch) {
        CompletableFuture<EtcdWatcher> oldWatcherFuture;
        KeySetReader keySetReader = this;
        synchronized (keySetReader) {
            oldWatcherFuture = this.watchFuture;
            this.watchFuture = rewatch && null == this.closeFuture ? this.watch(this.revision) : null;
        }
        if (null != oldWatcherFuture) {
            return oldWatcherFuture.thenCompose(EtcdWatcher::closeAsync);
        }
        return FutureUtils.Void();
    }

    @Override
    public void accept(WatchResponse watchResponse, Throwable throwable) {
        if (null == throwable) {
            Versioned<Set<T>> localValue = this.processWatchResponse(watchResponse);
            if (null != localValue) {
                this.notifyConsumers(localValue);
            }
        } else {
            this.closeOrRewatch(true);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> future;
        KeySetReader keySetReader = this;
        synchronized (keySetReader) {
            if (null == this.closeFuture) {
                this.closeFuture = this.closeOrRewatch(false).thenCompose(ignored -> {
                    if (this.ownWatchClient) {
                        return this.watchClient.closeAsync();
                    }
                    return FutureUtils.Void();
                });
            }
            future = this.closeFuture;
        }
        return future;
    }

    @Override
    public void close() {
        try {
            FutureUtils.result(this.closeAsync());
        }
        catch (Exception e) {
            log.warn("Encountered exceptions on closing key reader : {}", (Object)e.getMessage());
        }
    }
}

