/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.metadata.etcd;

import com.google.common.base.Strings;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.etcd.jetcd.ByteSequence;
import io.etcd.jetcd.Client;
import io.etcd.jetcd.EtcdConnectionManager;
import io.etcd.jetcd.api.WatchCancelRequest;
import io.etcd.jetcd.api.WatchCreateRequest;
import io.etcd.jetcd.api.WatchGrpc;
import io.etcd.jetcd.api.WatchRequest;
import io.etcd.jetcd.api.WatchResponse;
import io.etcd.jetcd.common.exception.ErrorCode;
import io.etcd.jetcd.common.exception.EtcdException;
import io.etcd.jetcd.common.exception.EtcdExceptionFactory;
import io.etcd.jetcd.options.WatchOption;
import io.etcd.jetcd.watch.WatchResponseWithError;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import java.util.LinkedList;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.metadata.etcd.EtcdWatcher;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashSet;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class EtcdWatchClient
implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(EtcdWatchClient.class);
    private final EtcdConnectionManager connMgr;
    private final WatchGrpc.WatchStub stub;
    private volatile StreamObserver<WatchRequest> grpcWatchStreamObserver;
    private final ConcurrentLongHashMap<EtcdWatcher> watchers = ConcurrentLongHashMap.newBuilder().build();
    private final LinkedList<EtcdWatcher> pendingWatchers = new LinkedList();
    private final ConcurrentLongHashSet cancelSet = ConcurrentLongHashSet.newBuilder().build();
    private final OrderedScheduler scheduler;
    private final ScheduledExecutorService watchExecutor;
    private CompletableFuture<Void> closeFuture = null;

    public EtcdWatchClient(Client client) {
        this.connMgr = new EtcdConnectionManager(client);
        this.stub = this.connMgr.newWatchStub();
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("etcd-watcher-scheduler").numThreads(Runtime.getRuntime().availableProcessors()).build();
        this.watchExecutor = this.scheduler.chooseThread();
    }

    public synchronized boolean isClosed() {
        return this.closeFuture != null;
    }

    public CompletableFuture<EtcdWatcher> watch(ByteSequence key, BiConsumer<io.etcd.jetcd.watch.WatchResponse, Throwable> consumer) {
        return this.watch(key, WatchOption.DEFAULT, consumer);
    }

    public CompletableFuture<EtcdWatcher> watch(ByteSequence key, WatchOption watchOption, BiConsumer<io.etcd.jetcd.watch.WatchResponse, Throwable> consumer) {
        return CompletableFuture.supplyAsync(() -> {
            if (this.isClosed()) {
                throw EtcdExceptionFactory.newClosedWatchClientException();
            }
            EtcdWatcher watcher = new EtcdWatcher(key, watchOption, (ScheduledExecutorService)this.scheduler.chooseThread(), this);
            watcher.addConsumer(consumer);
            this.pendingWatchers.add(watcher);
            if (this.pendingWatchers.size() == 1) {
                WatchRequest request = EtcdWatchClient.toWatchCreateRequest(watcher);
                this.getGrpcWatchStreamObserver().onNext((Object)request);
            }
            return watcher;
        }, this.watchExecutor);
    }

    private void notifyWatchers(EtcdException e) {
        WatchResponseWithError wre = new WatchResponseWithError(e);
        this.pendingWatchers.forEach(watcher -> watcher.notifyWatchResponse(wre));
        this.pendingWatchers.clear();
        this.watchers.values().forEach(watcher -> watcher.notifyWatchResponse(wre));
        this.watchers.clear();
    }

    public CompletableFuture<Void> unwatch(EtcdWatcher watcher) {
        return CompletableFuture.runAsync(() -> this.cancelWatcher(watcher.getWatchID()), this.watchExecutor);
    }

    private void cancelWatcher(long watchID) {
        if (this.isClosed()) {
            return;
        }
        if (this.cancelSet.contains(watchID)) {
            return;
        }
        this.watchers.remove(watchID);
        this.cancelSet.add(watchID);
        WatchCancelRequest watchCancelRequest = WatchCancelRequest.newBuilder().setWatchId(watchID).build();
        WatchRequest cancelRequest = WatchRequest.newBuilder().setCancelRequest(watchCancelRequest).build();
        this.getGrpcWatchStreamObserver().onNext((Object)cancelRequest);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public CompletableFuture<Void> closeAsync() {
        CompletableFuture<Void> future;
        EtcdWatchClient etcdWatchClient = this;
        synchronized (etcdWatchClient) {
            if (null == this.closeFuture) {
                log.info("Closing watch client");
                this.closeFuture = CompletableFuture.runAsync(() -> {
                    this.notifyWatchers((EtcdException)EtcdExceptionFactory.newClosedWatchClientException());
                    this.closeGrpcWatchStreamObserver();
                }, this.watchExecutor);
            }
            future = this.closeFuture;
        }
        return future.whenComplete((ignored, cause) -> this.scheduler.shutdown());
    }

    @Override
    public void close() {
        try {
            FutureUtils.result(this.closeAsync());
        }
        catch (Exception e) {
            log.warn("Encountered exceptions on closing watch client", (Throwable)e);
        }
        this.scheduler.forceShutdown(10L, TimeUnit.SECONDS);
    }

    private StreamObserver<WatchResponse> createWatchStreamObserver() {
        return new StreamObserver<WatchResponse>(){

            public void onNext(WatchResponse watchResponse) {
                if (EtcdWatchClient.this.isClosed()) {
                    return;
                }
                EtcdWatchClient.this.watchExecutor.submit(() -> EtcdWatchClient.this.processWatchResponse(watchResponse));
            }

            public void onError(Throwable t) {
                if (EtcdWatchClient.this.isClosed()) {
                    return;
                }
                EtcdWatchClient.this.watchExecutor.submit(() -> EtcdWatchClient.this.processError(t));
            }

            public void onCompleted() {
            }
        };
    }

    private void processWatchResponse(WatchResponse watchResponse) {
        if (this.isClosed()) {
            return;
        }
        if (watchResponse.getCreated()) {
            this.processCreate(watchResponse);
        } else if (watchResponse.getCanceled()) {
            this.processCanceled(watchResponse);
        } else {
            this.processEvents(watchResponse);
        }
    }

    private void processError(Throwable t) {
        block5: {
            Status status;
            block4: {
                if (this.isClosed()) {
                    return;
                }
                status = Status.fromThrowable((Throwable)t);
                if (EtcdWatchClient.isHaltError(status)) break block4;
                if (!EtcdWatchClient.isNoLeaderError(status)) break block5;
            }
            this.notifyWatchers(EtcdExceptionFactory.toEtcdException((Status)status));
            this.closeGrpcWatchStreamObserver();
            this.cancelSet.clear();
            return;
        }
        this.scheduler.schedule(this::resume, 500L, TimeUnit.MILLISECONDS);
    }

    private void resume() {
        this.closeGrpcWatchStreamObserver();
        this.cancelSet.clear();
        this.resumeWatchers();
    }

    private synchronized StreamObserver<WatchRequest> getGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            this.grpcWatchStreamObserver = this.stub.watch(this.createWatchStreamObserver());
        }
        return this.grpcWatchStreamObserver;
    }

    private void closeGrpcWatchStreamObserver() {
        if (this.grpcWatchStreamObserver == null) {
            return;
        }
        this.grpcWatchStreamObserver.onCompleted();
        this.grpcWatchStreamObserver = null;
    }

    private void processCreate(WatchResponse response) {
        EtcdWatcher watcher = this.pendingWatchers.poll();
        this.sendNextWatchCreateRequest();
        if (watcher == null) {
            log.warn("Watch client receives watch create response but find no corresponding watcher");
            return;
        }
        if (watcher.isClosed()) {
            return;
        }
        if (response.getWatchId() == -1L) {
            watcher.notifyWatchResponse(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException((ErrorCode)ErrorCode.INTERNAL, (String)"etcd server failed to create watch id")));
            return;
        }
        if (watcher.getRevision() == 0L) {
            watcher.setRevision(response.getHeader().getRevision());
        }
        watcher.setWatchID(response.getWatchId());
        this.watchers.put(watcher.getWatchID(), (Object)watcher);
    }

    private Optional<WatchRequest> nextResume() {
        EtcdWatcher pendingWatcher = this.pendingWatchers.peek();
        if (pendingWatcher != null) {
            return Optional.of(EtcdWatchClient.toWatchCreateRequest(pendingWatcher));
        }
        return Optional.empty();
    }

    private void sendNextWatchCreateRequest() {
        this.nextResume().ifPresent(nextWatchRequest -> this.getGrpcWatchStreamObserver().onNext(nextWatchRequest));
    }

    private void processEvents(WatchResponse response) {
        EtcdWatcher watcher = (EtcdWatcher)this.watchers.get(response.getWatchId());
        if (watcher == null) {
            this.cancelWatcher(response.getWatchId());
            return;
        }
        if (response.getCompactRevision() != 0L) {
            watcher.notifyWatchResponse(new WatchResponseWithError((EtcdException)EtcdExceptionFactory.newCompactedException((long)response.getCompactRevision())));
            return;
        }
        if (response.getEventsCount() == 0) {
            watcher.setRevision(response.getHeader().getRevision());
            return;
        }
        watcher.notifyWatchResponse(new WatchResponseWithError(new io.etcd.jetcd.watch.WatchResponse(response)));
        watcher.setRevision(response.getEvents(response.getEventsCount() - 1).getKv().getModRevision() + 1L);
    }

    private void resumeWatchers() {
        this.watchers.values().forEach(watcher -> {
            if (watcher.isClosed()) {
                return;
            }
            watcher.setWatchID(-1L);
            this.pendingWatchers.add((EtcdWatcher)watcher);
        });
        this.watchers.clear();
        this.sendNextWatchCreateRequest();
    }

    private void processCanceled(WatchResponse response) {
        EtcdWatcher watcher = (EtcdWatcher)this.watchers.get(response.getWatchId());
        this.cancelSet.remove(response.getWatchId());
        if (watcher == null) {
            return;
        }
        String reason = response.getCancelReason();
        if (Strings.isNullOrEmpty((String)reason)) {
            watcher.notifyWatchResponse(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException((ErrorCode)ErrorCode.OUT_OF_RANGE, (String)"etcdserver: mvcc: required revision is a future revision")));
        } else {
            watcher.notifyWatchResponse(new WatchResponseWithError(EtcdExceptionFactory.newEtcdException((ErrorCode)ErrorCode.FAILED_PRECONDITION, (String)reason)));
        }
    }

    private static boolean isNoLeaderError(Status status) {
        return status.getCode() == Status.Code.UNAVAILABLE && "etcdserver: no leader".equals(status.getDescription());
    }

    private static boolean isHaltError(Status status) {
        return status.getCode() != Status.Code.UNAVAILABLE && status.getCode() != Status.Code.INTERNAL;
    }

    private static WatchRequest toWatchCreateRequest(EtcdWatcher watcher) {
        ByteString key = UnsafeByteOperations.unsafeWrap((byte[])watcher.getKey().getBytes());
        WatchOption option = watcher.getWatchOption();
        WatchCreateRequest.Builder builder = WatchCreateRequest.newBuilder().setKey(key).setPrevKv(option.isPrevKV()).setProgressNotify(option.isProgressNotify()).setStartRevision(watcher.getRevision());
        option.getEndKey().ifPresent(endKey -> builder.setRangeEnd(UnsafeByteOperations.unsafeWrap((byte[])endKey.getBytes())));
        if (option.isNoDelete()) {
            builder.addFilters(WatchCreateRequest.FilterType.NODELETE);
        }
        if (option.isNoPut()) {
            builder.addFilters(WatchCreateRequest.FilterType.NOPUT);
        }
        return WatchRequest.newBuilder().setCreateRequest(builder).build();
    }
}

