/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.metadata.etcd;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.KV;
import com.coreos.jetcd.Txn;
import com.coreos.jetcd.common.exception.ClosedClientException;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.kv.GetResponse;
import com.coreos.jetcd.op.Cmp;
import com.coreos.jetcd.op.CmpTarget;
import com.coreos.jetcd.op.Op;
import com.coreos.jetcd.options.DeleteOption;
import com.coreos.jetcd.options.GetOption;
import com.coreos.jetcd.options.PutOption;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.function.Consumer;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.meta.LedgerMetadataSerDe;
import org.apache.bookkeeper.metadata.etcd.EtcdUtils;
import org.apache.bookkeeper.metadata.etcd.EtcdWatchClient;
import org.apache.bookkeeper.metadata.etcd.LedgerMetadataConsumer;
import org.apache.bookkeeper.metadata.etcd.helpers.KeyIterator;
import org.apache.bookkeeper.metadata.etcd.helpers.KeyStream;
import org.apache.bookkeeper.metadata.etcd.helpers.ValueStream;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.util.collections.ConcurrentLongHashMap;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.zookeeper.AsyncCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EtcdLedgerManager
implements LedgerManager {
    private static final Logger log = LoggerFactory.getLogger(EtcdLedgerManager.class);
    private final LedgerMetadataSerDe serDe = new LedgerMetadataSerDe();
    private final String scope;
    private final Client client;
    private final KV kvClient;
    private final EtcdWatchClient watchClient;
    private final ConcurrentLongHashMap<ValueStream<LedgerMetadata>> watchers = new ConcurrentLongHashMap();
    private final ConcurrentMap<BookkeeperInternalCallbacks.LedgerMetadataListener, LedgerMetadataConsumer> listeners = new ConcurrentHashMap<BookkeeperInternalCallbacks.LedgerMetadataListener, LedgerMetadataConsumer>();
    private volatile boolean closed = false;

    EtcdLedgerManager(Client client, String scope) {
        this.client = client;
        this.kvClient = client.getKVClient();
        this.scope = scope;
        this.watchClient = new EtcdWatchClient(client);
    }

    private boolean isClosed() {
        return this.closed;
    }

    ValueStream<LedgerMetadata> getLedgerMetadataStream(long ledgerId) {
        return (ValueStream)this.watchers.get(ledgerId);
    }

    public CompletableFuture<Versioned<LedgerMetadata>> createLedgerMetadata(long ledgerId, LedgerMetadata metadata) {
        ByteSequence valueBs;
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
        String ledgerKey = EtcdUtils.getLedgerKey(this.scope, ledgerId);
        log.info("Create ledger metadata under key {}", (Object)ledgerKey);
        ByteSequence ledgerKeyBs = ByteSequence.fromString((String)ledgerKey);
        try {
            valueBs = ByteSequence.fromBytes((byte[])this.serDe.serialize(metadata));
        }
        catch (IOException ioe) {
            promise.completeExceptionally((Throwable)new BKException.BKMetadataSerializationException((Throwable)ioe));
            return promise;
        }
        ((CompletableFuture)this.kvClient.txn().If(new Cmp[]{new Cmp(ledgerKeyBs, Cmp.Op.GREATER, (CmpTarget)CmpTarget.createRevision((long)0L))}).Then(new Op[]{Op.get((ByteSequence)ledgerKeyBs, (GetOption)GetOption.newBuilder().withCountOnly(true).build())}).Else(new Op[]{Op.put((ByteSequence)ledgerKeyBs, (ByteSequence)valueBs, (PutOption)PutOption.DEFAULT)}).commit().thenAccept(resp -> {
            if (resp.isSucceeded()) {
                GetResponse getResp = (GetResponse)resp.getGetResponses().get(0);
                if (getResp.getCount() <= 0L) {
                    promise.completeExceptionally((Throwable)new BKException.BKUnexpectedConditionException());
                } else {
                    promise.completeExceptionally((Throwable)new BKException.BKLedgerExistException());
                }
            } else {
                promise.complete(new Versioned((Object)metadata, (Version)new LongVersion(resp.getHeader().getRevision())));
            }
        })).exceptionally(cause -> {
            promise.completeExceptionally((Throwable)new BKException.MetaStoreException());
            return null;
        });
        return promise;
    }

    public CompletableFuture<Void> removeLedgerMetadata(long ledgerId, Version version) {
        CompletableFuture<Void> promise = new CompletableFuture<Void>();
        long revision = -43981L;
        if (Version.NEW == version) {
            log.error("Request to delete ledger {} metadata with version set to the initial one", (Object)ledgerId);
            promise.completeExceptionally((Throwable)new BKException.BKMetadataVersionException());
            return promise;
        }
        if (Version.ANY != version) {
            if (!(version instanceof LongVersion)) {
                log.info("Not an instance of LongVersion : {}", (Object)ledgerId);
                promise.completeExceptionally((Throwable)new BKException.BKMetadataVersionException());
                return promise;
            }
            revision = ((LongVersion)version).getLongVersion();
        }
        String ledgerKey = EtcdUtils.getLedgerKey(this.scope, ledgerId);
        ByteSequence ledgerKeyBs = ByteSequence.fromString((String)ledgerKey);
        Txn txn = this.kvClient.txn();
        txn = revision == -43981L ? txn.If(new Cmp[]{new Cmp(ledgerKeyBs, Cmp.Op.GREATER, (CmpTarget)CmpTarget.createRevision((long)0L))}) : txn.If(new Cmp[]{new Cmp(ledgerKeyBs, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.modRevision((long)revision))});
        ((CompletableFuture)txn.Then(new Op[]{Op.delete((ByteSequence)ledgerKeyBs, (DeleteOption)DeleteOption.DEFAULT)}).Else(new Op[]{Op.get((ByteSequence)ledgerKeyBs, (GetOption)GetOption.DEFAULT)}).commit().thenAccept(txnResp -> {
            if (txnResp.isSucceeded()) {
                promise.complete(null);
            } else {
                GetResponse getResp = (GetResponse)txnResp.getGetResponses().get(0);
                if (getResp.getCount() > 0L) {
                    promise.completeExceptionally((Throwable)new BKException.BKMetadataVersionException());
                } else {
                    log.warn("Deleting ledger {} failed due to : ledger key {} doesn't exist", (Object)ledgerId, (Object)ledgerKey);
                    promise.completeExceptionally((Throwable)new BKException.BKNoSuchLedgerExistsException());
                }
            }
        })).exceptionally(cause -> {
            promise.completeExceptionally((Throwable)new BKException.MetaStoreException());
            return null;
        });
        return promise;
    }

    public CompletableFuture<Versioned<LedgerMetadata>> readLedgerMetadata(long ledgerId) {
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
        String ledgerKey = EtcdUtils.getLedgerKey(this.scope, ledgerId);
        ByteSequence ledgerKeyBs = ByteSequence.fromString((String)ledgerKey);
        log.info("read ledger metadata under key {}", (Object)ledgerKey);
        ((CompletableFuture)this.kvClient.get(ledgerKeyBs).thenAccept(getResp -> {
            if (getResp.getCount() > 0L) {
                KeyValue kv = (KeyValue)getResp.getKvs().get(0);
                byte[] data = kv.getValue().getBytes();
                try {
                    LedgerMetadata metadata = this.serDe.parseConfig(data, ledgerId, Optional.empty());
                    promise.complete(new Versioned((Object)metadata, (Version)new LongVersion(kv.getModRevision())));
                }
                catch (IOException ioe) {
                    log.error("Could not parse ledger metadata for ledger : {}", (Object)ledgerId, (Object)ioe);
                    promise.completeExceptionally((Throwable)new BKException.MetaStoreException());
                    return;
                }
            } else {
                promise.completeExceptionally((Throwable)new BKException.BKNoSuchLedgerExistsException());
            }
        })).exceptionally(cause -> {
            promise.completeExceptionally((Throwable)new BKException.MetaStoreException());
            return null;
        });
        return promise;
    }

    public CompletableFuture<Versioned<LedgerMetadata>> writeLedgerMetadata(long ledgerId, LedgerMetadata metadata, Version currentVersion) {
        ByteSequence valueBs;
        CompletableFuture<Versioned<LedgerMetadata>> promise = new CompletableFuture<Versioned<LedgerMetadata>>();
        if (Version.NEW == currentVersion || !(currentVersion instanceof LongVersion)) {
            promise.completeExceptionally((Throwable)new BKException.BKMetadataVersionException());
            return promise;
        }
        LongVersion lv = (LongVersion)currentVersion;
        String ledgerKey = EtcdUtils.getLedgerKey(this.scope, ledgerId);
        ByteSequence ledgerKeyBs = ByteSequence.fromString((String)ledgerKey);
        try {
            valueBs = ByteSequence.fromBytes((byte[])this.serDe.serialize(metadata));
        }
        catch (IOException ioe) {
            promise.completeExceptionally((Throwable)new BKException.BKMetadataSerializationException((Throwable)ioe));
            return promise;
        }
        ((CompletableFuture)this.kvClient.txn().If(new Cmp[]{new Cmp(ledgerKeyBs, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.modRevision((long)lv.getLongVersion()))}).Then(new Op[]{Op.put((ByteSequence)ledgerKeyBs, (ByteSequence)valueBs, (PutOption)PutOption.DEFAULT)}).Else(new Op[]{Op.get((ByteSequence)ledgerKeyBs, (GetOption)GetOption.DEFAULT)}).commit().thenAccept(resp -> {
            if (resp.isSucceeded()) {
                promise.complete(new Versioned((Object)metadata, (Version)new LongVersion(resp.getHeader().getRevision())));
            } else {
                GetResponse getResp = (GetResponse)resp.getGetResponses().get(0);
                if (getResp.getCount() > 0L) {
                    log.warn("Conditional update ledger metadata failed : expected version = {}, actual version = {}", (Object)((KeyValue)getResp.getKvs().get(0)).getModRevision(), (Object)lv);
                    promise.completeExceptionally((Throwable)new BKException.BKMetadataVersionException());
                } else {
                    promise.completeExceptionally((Throwable)new BKException.BKNoSuchLedgerExistsException());
                }
            }
        })).exceptionally(cause -> {
            promise.completeExceptionally((Throwable)new BKException.MetaStoreException());
            return null;
        });
        return promise;
    }

    private LedgerMetadataConsumer listenerToConsumer(long ledgerId, BookkeeperInternalCallbacks.LedgerMetadataListener listener, Consumer<Long> onDeletedConsumer) {
        return new LedgerMetadataConsumer(ledgerId, listener, onDeletedConsumer);
    }

    public void registerLedgerMetadataListener(long ledgerId, BookkeeperInternalCallbacks.LedgerMetadataListener listener) {
        if (this.listeners.containsKey(listener)) {
            return;
        }
        ValueStream lmStream = (ValueStream)this.watchers.computeIfAbsent(ledgerId, lid -> new ValueStream<LedgerMetadata>(this.client, this.watchClient, bs -> {
            try {
                return this.serDe.parseConfig(bs.getBytes(), lid, Optional.empty());
            }
            catch (IOException ioe) {
                log.error("Could not parse ledger metadata : {}", (Object)bs.toStringUtf8(), (Object)ioe);
                throw new RuntimeException("Could not parse ledger metadata : " + bs.toStringUtf8(), ioe);
            }
        }, ByteSequence.fromString((String)EtcdUtils.getLedgerKey(this.scope, ledgerId))));
        LedgerMetadataConsumer lmConsumer = this.listenerToConsumer(ledgerId, listener, lid -> {
            if (this.watchers.remove(lid.longValue(), (Object)lmStream)) {
                log.info("Closed ledger metadata watcher on ledger {} deletion.", lid);
                lmStream.closeAsync();
            }
        });
        LedgerMetadataConsumer oldConsumer = this.listeners.putIfAbsent(listener, lmConsumer);
        if (null != oldConsumer) {
            return;
        }
        lmStream.readAndWatch(lmConsumer).whenComplete((values, cause) -> {
            if (null != cause && !(cause instanceof ClosedClientException)) {
                this.registerLedgerMetadataListener(ledgerId, listener);
            }
        });
    }

    public void unregisterLedgerMetadataListener(long ledgerId, BookkeeperInternalCallbacks.LedgerMetadataListener listener) {
        LedgerMetadataConsumer lmConsumer = (LedgerMetadataConsumer)this.listeners.remove(listener);
        this.unregisterLedgerMetadataListener(ledgerId, lmConsumer);
    }

    private void unregisterLedgerMetadataListener(long ledgerId, LedgerMetadataConsumer lmConsumer) {
        ValueStream lmStream = (ValueStream)this.watchers.get(ledgerId);
        if (null == lmStream) {
            return;
        }
        ((CompletableFuture)lmStream.unwatch(lmConsumer).thenAccept(noConsumers -> {
            if (noConsumers.booleanValue() && this.watchers.remove(ledgerId, (Object)lmStream)) {
                log.info("Closed ledger metadata watcher on ledger {} since there are no listeners any more.", (Object)ledgerId);
                lmStream.closeAsync();
            }
        })).exceptionally(cause -> {
            if (cause instanceof ClosedClientException) {
                this.unregisterLedgerMetadataListener(ledgerId, lmConsumer);
            }
            return null;
        });
    }

    public void asyncProcessLedgers(BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback finalCb, Object context, int successRc, int failureRc) {
        KeyStream<Long> ks = new KeyStream<Long>(this.kvClient, ByteSequence.fromString((String)EtcdUtils.getLedgerKey(this.scope, 0L)), ByteSequence.fromString((String)EtcdUtils.getLedgerKey(this.scope, Long.MAX_VALUE)), bs -> {
            UUID uuid = EtcdUtils.parseLedgerKey(bs.toStringUtf8());
            return uuid.getLeastSignificantBits();
        });
        this.processLedgers(ks, processor, finalCb, context, successRc, failureRc);
    }

    private void processLedgers(KeyStream<Long> ks, BookkeeperInternalCallbacks.Processor<Long> processor, AsyncCallback.VoidCallback finalCb, Object context, int successRc, int failureRc) {
        ks.readNext().whenCompleteAsync((ledgers, cause) -> {
            if (null != cause) {
                finalCb.processResult(failureRc, null, context);
            } else if (ledgers.isEmpty()) {
                finalCb.processResult(successRc, null, context);
            } else {
                ledgers.forEach(l -> processor.process(l, finalCb));
                this.processLedgers(ks, processor, finalCb, context, successRc, failureRc);
            }
        });
    }

    public LedgerManager.LedgerRangeIterator getLedgerRanges(long opTimeOutMs) {
        KeyStream<Long> ks = new KeyStream<Long>(this.kvClient, ByteSequence.fromString((String)EtcdUtils.getLedgerKey(this.scope, 0L)), ByteSequence.fromString((String)EtcdUtils.getLedgerKey(this.scope, Long.MAX_VALUE)), bs -> {
            UUID uuid = EtcdUtils.parseLedgerKey(bs.toStringUtf8());
            return uuid.getLeastSignificantBits();
        });
        final KeyIterator<Long> ki = new KeyIterator<Long>(ks);
        return new LedgerManager.LedgerRangeIterator(){

            public boolean hasNext() throws IOException {
                try {
                    return ki.hasNext();
                }
                catch (Exception e) {
                    if (e instanceof IOException) {
                        throw (IOException)e;
                    }
                    throw new IOException(e);
                }
            }

            public LedgerManager.LedgerRange next() throws IOException {
                try {
                    List values = ki.next();
                    TreeSet ledgers = Sets.newTreeSet();
                    ledgers.addAll(values);
                    return new LedgerManager.LedgerRange((Set)ledgers);
                }
                catch (Exception e) {
                    if (e instanceof IOException) {
                        throw (IOException)e;
                    }
                    throw new IOException(e);
                }
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void close() {
        EtcdLedgerManager etcdLedgerManager = this;
        synchronized (etcdLedgerManager) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        this.watchClient.close();
    }
}

