/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.metadata.etcd.helpers;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.common.exception.ClosedClientException;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.kv.GetResponse;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchResponse;
import com.google.common.annotations.VisibleForTesting;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.metadata.etcd.EtcdWatchClient;
import org.apache.bookkeeper.metadata.etcd.EtcdWatcher;
import org.apache.bookkeeper.metadata.etcd.helpers.RevisionedConsumer;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ValueStream<T>
implements BiConsumer<WatchResponse, Throwable>,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ValueStream.class);
    private final Client client;
    private final boolean ownWatchClient;
    private final EtcdWatchClient watchClient;
    private final Function<ByteSequence, T> encoder;
    private final ByteSequence key;
    private final Map<Consumer<Versioned<T>>, RevisionedConsumer<T>> consumers = new HashMap<Consumer<Versioned<T>>, RevisionedConsumer<T>>();
    private volatile T localValue = null;
    private volatile long revision = -1L;
    private CompletableFuture<EtcdWatcher> watchFuture = null;
    private CompletableFuture<Void> closeFuture = null;

    public ValueStream(Client client, Function<ByteSequence, T> encoder, ByteSequence key) {
        this(client, new EtcdWatchClient(client), encoder, key);
    }

    public ValueStream(Client client, EtcdWatchClient watchClient, Function<ByteSequence, T> encoder, ByteSequence key) {
        this.client = client;
        this.watchClient = watchClient;
        this.ownWatchClient = false;
        this.encoder = encoder;
        this.key = key;
    }

    public CompletableFuture<Versioned<T>> read() {
        return this.client.getKVClient().get(this.key).thenApply(getResp -> {
            boolean updated = this.updateLocalValue((GetResponse)getResp);
            Versioned<T> localValue = this.getLocalValue();
            try {
                Versioned<T> versioned = localValue;
                return versioned;
            }
            finally {
                if (updated) {
                    this.notifyConsumers(localValue);
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @VisibleForTesting
    public int getNumConsumers() {
        Map<Consumer<Versioned<T>>, RevisionedConsumer<T>> map = this.consumers;
        synchronized (map) {
            return this.consumers.size();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void notifyConsumers(Versioned<T> localValue) {
        Map<Consumer<Versioned<T>>, RevisionedConsumer<T>> map = this.consumers;
        synchronized (map) {
            this.consumers.values().forEach(consumer -> consumer.accept(localValue));
        }
    }

    private synchronized boolean updateLocalValue(GetResponse response) {
        if (this.revision < response.getHeader().getRevision()) {
            this.revision = response.getHeader().getRevision();
            this.localValue = response.getCount() > 0L ? this.encoder.apply(((KeyValue)response.getKvs().get(0)).getValue()) : null;
            return true;
        }
        return false;
    }

    private synchronized Versioned<T> processWatchResponse(WatchResponse response) {
        if (null != this.closeFuture) {
            return null;
        }
        if (log.isDebugEnabled()) {
            log.debug("Received watch response : revision = {}, {} events = {}", new Object[]{response.getHeader().getRevision(), response.getEvents().size(), response.getEvents()});
        }
        if (response.getHeader().getRevision() <= this.revision) {
            return null;
        }
        this.revision = response.getHeader().getRevision();
        response.getEvents().forEach(event -> {
            switch (event.getEventType()) {
                case PUT: {
                    this.localValue = this.encoder.apply(event.getKeyValue().getValue());
                    break;
                }
                case DELETE: {
                    this.localValue = null;
                    break;
                }
            }
        });
        return this.getLocalValue();
    }

    @VisibleForTesting
    synchronized Versioned<T> getLocalValue() {
        return new Versioned(this.localValue, (Version)new LongVersion(this.revision));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Versioned<T>> getOrRead() {
        boolean shouldRead = false;
        ValueStream valueStream = this;
        synchronized (valueStream) {
            if (this.revision < 0L) {
                shouldRead = true;
            }
        }
        if (shouldRead) {
            return this.read();
        }
        return FutureUtils.value(this.getLocalValue());
    }

    @VisibleForTesting
    synchronized boolean isWatcherSet() {
        return null != this.watchFuture;
    }

    private synchronized CompletableFuture<EtcdWatcher> getWatchFuture() {
        return this.watchFuture;
    }

    @VisibleForTesting
    public CompletableFuture<EtcdWatcher> waitUntilWatched() {
        CompletableFuture<EtcdWatcher> wf;
        while ((wf = this.getWatchFuture()) == null) {
            try {
                TimeUnit.MILLISECONDS.sleep(100L);
            }
            catch (InterruptedException e) {
                if (!log.isDebugEnabled()) continue;
                log.debug("Interrupted at waiting until the key is watched", (Throwable)e);
            }
        }
        return wf;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Versioned<T>> readAndWatch(Consumer<Versioned<T>> consumer) {
        boolean consumerExisted;
        RevisionedConsumer<T> revisionedConsumer = new RevisionedConsumer<T>(consumer);
        Map<Consumer<Versioned<T>>, RevisionedConsumer<T>> map = this.consumers;
        synchronized (map) {
            consumerExisted = null != this.consumers.put(consumer, revisionedConsumer);
        }
        if (consumerExisted) {
            return this.getOrRead();
        }
        return this.getOrRead().thenCompose(versionedVal -> {
            long revision = ((LongVersion)versionedVal.getVersion()).getLongVersion();
            ValueStream valueStream = this;
            synchronized (valueStream) {
                this.notifyConsumers((Versioned<T>)versionedVal);
            }
            return this.watchIfNeeded(revision).thenApply(ignored -> versionedVal);
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Boolean> unwatch(Consumer<Versioned<T>> consumer) {
        boolean lastConsumer;
        Map<Consumer<Versioned<T>>, RevisionedConsumer<T>> map = this.consumers;
        synchronized (map) {
            lastConsumer = null != this.consumers.remove(consumer) && this.consumers.isEmpty();
        }
        if (lastConsumer) {
            return this.closeOrRewatch(false).thenApply(ignored -> true);
        }
        return FutureUtils.value((Object)false);
    }

    private synchronized CompletableFuture<EtcdWatcher> watchIfNeeded(long revision) {
        if (null != this.watchFuture) {
            return this.watchFuture;
        }
        this.watchFuture = this.watch(revision);
        return this.watchFuture;
    }

    private CompletableFuture<EtcdWatcher> watch(long revision) {
        WatchOption.Builder optionBuilder = WatchOption.newBuilder().withRevision(revision);
        return this.watchClient.watch(this.key, optionBuilder.build(), this).whenComplete((watcher, cause) -> {
            if (null != cause) {
                ValueStream valueStream = this;
                synchronized (valueStream) {
                    this.watchFuture = null;
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private CompletableFuture<Void> closeOrRewatch(boolean rewatch) {
        CompletableFuture<EtcdWatcher> oldWatcherFuture;
        ValueStream valueStream = this;
        synchronized (valueStream) {
            oldWatcherFuture = this.watchFuture;
            this.watchFuture = rewatch && null == this.closeFuture ? this.watch(this.revision) : null;
        }
        if (null != oldWatcherFuture) {
            return oldWatcherFuture.thenCompose(EtcdWatcher::closeAsync);
        }
        return FutureUtils.Void();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void accept(WatchResponse watchResponse, Throwable throwable) {
        if (null == throwable) {
            if (log.isDebugEnabled()) {
                log.debug("Received watch response : revision = {}, {} events = {}", new Object[]{watchResponse.getHeader().getRevision(), watchResponse.getEvents().size(), watchResponse.getEvents()});
            }
            ValueStream valueStream = this;
            synchronized (valueStream) {
                Versioned<T> localValue = this.processWatchResponse(watchResponse);
                if (null != localValue) {
                    this.notifyConsumers(localValue);
                }
            }
        } else {
            this.closeOrRewatch(!(throwable instanceof ClosedClientException));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> future;
        ValueStream valueStream = this;
        synchronized (valueStream) {
            if (null == this.closeFuture) {
                this.closeFuture = this.closeOrRewatch(false).thenCompose(ignored -> {
                    if (this.ownWatchClient) {
                        return this.watchClient.closeAsync();
                    }
                    return FutureUtils.Void();
                });
            }
            future = this.closeFuture;
        }
        return future;
    }

    @Override
    public void close() {
        try {
            FutureUtils.result(this.closeAsync());
        }
        catch (Exception e) {
            log.warn("Encountered exceptions on closing key reader : {}", (Object)e.getMessage());
        }
    }
}

