/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.metadata.etcd;

import com.coreos.jetcd.Client;
import com.coreos.jetcd.KV;
import com.coreos.jetcd.Txn;
import com.coreos.jetcd.Watch;
import com.coreos.jetcd.data.ByteSequence;
import com.coreos.jetcd.data.KeyValue;
import com.coreos.jetcd.kv.DeleteResponse;
import com.coreos.jetcd.kv.GetResponse;
import com.coreos.jetcd.kv.TxnResponse;
import com.coreos.jetcd.op.Cmp;
import com.coreos.jetcd.op.CmpTarget;
import com.coreos.jetcd.op.Op;
import com.coreos.jetcd.options.DeleteOption;
import com.coreos.jetcd.options.GetOption;
import com.coreos.jetcd.options.PutOption;
import com.coreos.jetcd.options.WatchOption;
import com.coreos.jetcd.watch.WatchEvent;
import com.coreos.jetcd.watch.WatchResponse;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.UncheckedExecutionException;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.discover.RegistrationManager;
import org.apache.bookkeeper.meta.LedgerLayout;
import org.apache.bookkeeper.metadata.etcd.EtcdBookieRegister;
import org.apache.bookkeeper.metadata.etcd.EtcdConstants;
import org.apache.bookkeeper.metadata.etcd.EtcdLedgerManagerFactory;
import org.apache.bookkeeper.metadata.etcd.EtcdUtils;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class EtcdRegistrationManager
implements RegistrationManager {
    private static final Logger log = LoggerFactory.getLogger(EtcdRegistrationManager.class);
    private final String scope;
    private final Client client;
    private final boolean ownClient;
    private final KV kvClient;
    private final EtcdBookieRegister bkRegister;

    @VisibleForTesting
    EtcdRegistrationManager(Client client, String scope) {
        this(client, scope, 60L);
    }

    @VisibleForTesting
    EtcdRegistrationManager(Client client, String scope, long ttlSeconds) {
        this(client, scope, ttlSeconds, () -> {});
    }

    @VisibleForTesting
    EtcdRegistrationManager(Client client, String scope, long ttlSeconds, RegistrationManager.RegistrationListener listener) {
        this(client, scope, new EtcdBookieRegister(client.getLeaseClient(), ttlSeconds, listener).start(), true);
    }

    EtcdRegistrationManager(Client client, String scope, EtcdBookieRegister bkRegister) {
        this(client, scope, bkRegister, false);
    }

    private EtcdRegistrationManager(Client client, String scope, EtcdBookieRegister bkRegister, boolean ownClient) {
        this.scope = scope;
        this.client = client;
        this.kvClient = client.getKVClient();
        this.bkRegister = bkRegister;
        this.ownClient = ownClient;
    }

    public void close() {
        if (this.ownClient) {
            log.info("Closing registration manager under scope '{}'", (Object)this.scope);
            this.bkRegister.close();
            this.client.close();
            log.info("Successfully closed registration manager under scope '{}'", (Object)this.scope);
        }
    }

    public void registerBookie(BookieId bookieId, boolean readOnly, BookieServiceInfo bookieServiceInfo) throws BookieException {
        if (readOnly) {
            this.doRegisterReadonlyBookie(bookieId, this.bkRegister.get());
        } else {
            this.doRegisterBookie(EtcdUtils.getWritableBookiePath(this.scope, bookieId), this.bkRegister.get());
        }
    }

    private boolean checkRegNodeAndWaitExpired(String regPath, long leaseId) throws BookieException.MetadataStoreException {
        ByteSequence regPathBs = ByteSequence.fromString((String)regPath);
        GetResponse getResp = (GetResponse)EtcdUtils.msResult(this.kvClient.get(regPathBs));
        if (getResp.getCount() <= 0L) {
            return false;
        }
        return this.waitUntilRegNodeExpired(regPath, leaseId);
    }

    private boolean waitUntilRegNodeExpired(String regPath, long leaseId) throws BookieException.MetadataStoreException {
        ByteSequence regPathBs = ByteSequence.fromString((String)regPath);
        GetResponse getResp = (GetResponse)EtcdUtils.msResult(this.kvClient.get(regPathBs));
        if (getResp.getCount() <= 0L) {
            return false;
        }
        KeyValue kv = (KeyValue)getResp.getKvs().get(0);
        if (kv.getLease() != leaseId) {
            Watch watchClient = this.client.getWatchClient();
            Watch.Watcher watcher = watchClient.watch(regPathBs, WatchOption.newBuilder().withRevision(getResp.getHeader().getRevision() + 1L).build());
            log.info("Previous bookie registration (lease = {}) still exists at {}, so new lease '{}' will be waiting previous lease for {} seconds to be expired", new Object[]{kv.getLease(), regPath, leaseId, this.bkRegister.getTtlSeconds()});
            CompletableFuture<Void> watchFuture = CompletableFuture.runAsync(() -> {
                try {
                    block2: while (true) {
                        WatchEvent event;
                        log.info("Listening on '{}' until it is expired", (Object)regPath);
                        WatchResponse response = watcher.listen();
                        Iterator iterator = response.getEvents().iterator();
                        do {
                            if (!iterator.hasNext()) continue block2;
                            event = (WatchEvent)iterator.next();
                            log.info("Received watch event on '{}' : EventType = {}", (Object)regPath, (Object)event.getEventType());
                        } while (WatchEvent.EventType.DELETE != event.getEventType());
                        break;
                    }
                    return;
                }
                catch (InterruptedException e) {
                    throw new UncheckedExecutionException("Interrupted at waiting previous registration under " + regPath + " (lease = " + kv.getLease() + ") to be expired", (Throwable)e);
                }
            });
            try {
                EtcdUtils.msResult(watchFuture, 2L * this.bkRegister.getTtlSeconds(), TimeUnit.SECONDS);
                boolean bl = false;
                return bl;
            }
            catch (TimeoutException e) {
                watchFuture.cancel(true);
                throw new BookieException.MetadataStoreException("Previous bookie registration still exists at " + regPath + " (lease = " + kv.getLease() + ") after " + 2L * this.bkRegister.getTtlSeconds() + " seconds elapsed");
            }
            catch (UncheckedExecutionException uee) {
                throw new BookieException.MetadataStoreException(uee.getMessage(), uee.getCause());
            }
            finally {
                watcher.close();
            }
        }
        return true;
    }

    private void doRegisterBookie(String regPath, long leaseId) throws BookieException.MetadataStoreException {
        if (this.checkRegNodeAndWaitExpired(regPath, leaseId)) {
            return;
        }
        ByteSequence regPathBs = ByteSequence.fromString((String)regPath);
        Txn txn = this.kvClient.txn().If(new Cmp[]{new Cmp(regPathBs, Cmp.Op.GREATER, (CmpTarget)CmpTarget.createRevision((long)0L))}).Then(new Op[]{Op.get((ByteSequence)regPathBs, (GetOption)GetOption.DEFAULT)}).Else(new Op[]{Op.put((ByteSequence)regPathBs, (ByteSequence)ByteSequence.fromBytes((byte[])new byte[0]), (PutOption)PutOption.newBuilder().withLeaseId(this.bkRegister.get().longValue()).build())});
        TxnResponse txnResp = (TxnResponse)EtcdUtils.msResult(txn.commit());
        if (txnResp.isSucceeded()) {
            GetResponse getResp = (GetResponse)txnResp.getGetResponses().get(0);
            if (getResp.getCount() <= 0L) {
                throw new BookieException.MetadataStoreException("Failed to register bookie under '" + regPath + "', but no bookie is registered there.");
            }
            KeyValue kv = (KeyValue)getResp.getKvs().get(0);
            throw new BookieException.MetadataStoreException("Another bookie already registered under '" + regPath + "': lease = " + kv.getLease());
        }
        log.info("Successfully registered bookie at {}", (Object)regPath);
    }

    private void doRegisterReadonlyBookie(BookieId bookieId, long leaseId) throws BookieException.MetadataStoreException {
        String readonlyRegPath = EtcdUtils.getReadonlyBookiePath(this.scope, bookieId);
        this.doRegisterBookie(readonlyRegPath, leaseId);
        String writableRegPath = EtcdUtils.getWritableBookiePath(this.scope, bookieId);
        EtcdUtils.msResult(this.kvClient.delete(ByteSequence.fromString((String)writableRegPath)));
    }

    public void unregisterBookie(BookieId bookieId, boolean readOnly) throws BookieException {
        String regPath = readOnly ? EtcdUtils.getReadonlyBookiePath(this.scope, bookieId) : EtcdUtils.getWritableBookiePath(this.scope, bookieId);
        DeleteResponse delResp = (DeleteResponse)EtcdUtils.msResult(this.kvClient.delete(ByteSequence.fromString((String)regPath)));
        if (delResp.getDeleted() > 0L) {
            log.info("Successfully unregistered bookie {} from {}", (Object)bookieId, (Object)regPath);
        } else {
            log.info("Bookie disappeared from {} before unregistering", (Object)regPath);
        }
    }

    public boolean isBookieRegistered(BookieId bookieId) throws BookieException {
        CompletableFuture getWritableFuture = this.kvClient.get(ByteSequence.fromString((String)EtcdUtils.getWritableBookiePath(this.scope, bookieId)), GetOption.newBuilder().withCountOnly(true).build());
        CompletableFuture getReadonlyFuture = this.kvClient.get(ByteSequence.fromString((String)EtcdUtils.getReadonlyBookiePath(this.scope, bookieId)), GetOption.newBuilder().withCountOnly(true).build());
        return ((GetResponse)EtcdUtils.msResult(getWritableFuture)).getCount() > 0L || ((GetResponse)EtcdUtils.msResult(getReadonlyFuture)).getCount() > 0L;
    }

    public void writeCookie(BookieId bookieId, Versioned<byte[]> cookieData) throws BookieException {
        ByteSequence cookiePath = ByteSequence.fromString((String)EtcdUtils.getCookiePath(this.scope, bookieId));
        Txn txn = this.kvClient.txn();
        if (Version.NEW == cookieData.getVersion()) {
            txn.If(new Cmp[]{new Cmp(cookiePath, Cmp.Op.GREATER, (CmpTarget)CmpTarget.createRevision((long)0L))}).Else(new Op[]{Op.put((ByteSequence)cookiePath, (ByteSequence)ByteSequence.fromBytes((byte[])((byte[])cookieData.getValue())), (PutOption)PutOption.DEFAULT)});
        } else {
            if (!(cookieData.getVersion() instanceof LongVersion)) {
                throw new BookieException.BookieIllegalOpException("Invalid version type, expected it to be LongVersion");
            }
            txn.If(new Cmp[]{new Cmp(cookiePath, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.modRevision((long)((LongVersion)cookieData.getVersion()).getLongVersion()))}).Then(new Op[]{Op.put((ByteSequence)cookiePath, (ByteSequence)ByteSequence.fromBytes((byte[])((byte[])cookieData.getValue())), (PutOption)PutOption.DEFAULT)});
        }
        TxnResponse response = (TxnResponse)EtcdUtils.msResult(txn.commit());
        if (response.isSucceeded() != (Version.NEW != cookieData.getVersion())) {
            throw new BookieException.MetadataStoreException("Conflict on writing cookie for bookie " + bookieId);
        }
    }

    public Versioned<byte[]> readCookie(BookieId bookieId) throws BookieException {
        ByteSequence cookiePath = ByteSequence.fromString((String)EtcdUtils.getCookiePath(this.scope, bookieId));
        GetResponse resp = (GetResponse)EtcdUtils.msResult(this.kvClient.get(cookiePath));
        if (resp.getCount() <= 0L) {
            throw new BookieException.CookieNotFoundException(bookieId.toString());
        }
        KeyValue kv = (KeyValue)resp.getKvs().get(0);
        return new Versioned((Object)kv.getValue().getBytes(), (Version)new LongVersion(kv.getModRevision()));
    }

    public void removeCookie(BookieId bookieId, Version version) throws BookieException {
        ByteSequence cookiePath = ByteSequence.fromString((String)EtcdUtils.getCookiePath(this.scope, bookieId));
        Txn delTxn = this.kvClient.txn().If(new Cmp[]{new Cmp(cookiePath, Cmp.Op.EQUAL, (CmpTarget)CmpTarget.modRevision((long)((LongVersion)version).getLongVersion()))}).Then(new Op[]{Op.delete((ByteSequence)cookiePath, (DeleteOption)DeleteOption.DEFAULT)}).Else(new Op[]{Op.get((ByteSequence)cookiePath, (GetOption)GetOption.newBuilder().withCountOnly(true).build())});
        TxnResponse txnResp = (TxnResponse)EtcdUtils.msResult(delTxn.commit());
        if (!txnResp.isSucceeded()) {
            GetResponse getResp = (GetResponse)txnResp.getGetResponses().get(0);
            if (getResp.getCount() > 0L) {
                throw new BookieException.MetadataStoreException("Failed to remove cookie from " + cookiePath.toStringUtf8() + " for bookie " + bookieId + " : bad version '" + version + "'");
            }
            throw new BookieException.CookieNotFoundException(bookieId.toString());
        }
        log.info("Removed cookie from {} for bookie {}", (Object)cookiePath.toStringUtf8(), (Object)bookieId);
    }

    public String getClusterInstanceId() throws BookieException {
        GetResponse response = (GetResponse)EtcdUtils.msResult(this.kvClient.get(ByteSequence.fromString((String)EtcdUtils.getClusterInstanceIdPath(this.scope))));
        if (response.getCount() <= 0L) {
            log.error("BookKeeper metadata doesn't exist in Etcd. Has the cluster been initialized? Try running bin/bookkeeper shell initNewCluster");
            throw new BookieException.MetadataStoreException("BookKeeper is not initialized under '" + this.scope + "' yet");
        }
        KeyValue kv = (KeyValue)response.getKvs().get(0);
        return new String(kv.getValue().getBytes(), StandardCharsets.UTF_8);
    }

    public boolean prepareFormat() throws Exception {
        ByteSequence rootScopeKey = ByteSequence.fromString((String)this.scope);
        GetResponse resp = (GetResponse)EtcdUtils.msResult(this.kvClient.get(rootScopeKey));
        return resp.getCount() > 0L;
    }

    public boolean initNewCluster() throws Exception {
        return EtcdRegistrationManager.initNewCluster(this.kvClient, this.scope);
    }

    static boolean initNewCluster(KV kvClient, String scope) throws Exception {
        ByteSequence rootScopeKey = ByteSequence.fromString((String)scope);
        String instanceId = UUID.randomUUID().toString();
        LedgerLayout layout = new LedgerLayout(EtcdLedgerManagerFactory.class.getName(), 0);
        Txn initTxn = kvClient.txn().If(new Cmp[]{new Cmp(rootScopeKey, Cmp.Op.GREATER, (CmpTarget)CmpTarget.createRevision((long)0L))}).Else(new Op[]{Op.put((ByteSequence)rootScopeKey, (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getLayoutKey(scope)), (ByteSequence)ByteSequence.fromBytes((byte[])layout.serialize()), (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getClusterInstanceIdPath(scope)), (ByteSequence)ByteSequence.fromString((String)instanceId), (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getCookiesPath(scope)), (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getBookiesPath(scope)), (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getWritableBookiesPath(scope)), (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getReadonlyBookiesPath(scope)), (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getLedgersPath(scope)), (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getBucketsPath(scope)), (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT), Op.put((ByteSequence)ByteSequence.fromString((String)EtcdUtils.getUnderreplicationPath(scope)), (ByteSequence)EtcdConstants.EMPTY_BS, (PutOption)PutOption.DEFAULT)});
        return !((TxnResponse)EtcdUtils.msResult(initTxn.commit())).isSucceeded();
    }

    public boolean format() throws Exception {
        return EtcdRegistrationManager.format(this.kvClient, this.scope);
    }

    static boolean format(KV kvClient, String scope) throws Exception {
        ByteSequence rootScopeKey = ByteSequence.fromString((String)scope);
        GetResponse resp = (GetResponse)EtcdUtils.msResult(kvClient.get(rootScopeKey));
        if (resp.getCount() <= 0L) {
            return EtcdRegistrationManager.initNewCluster(kvClient, scope);
        }
        if (EtcdRegistrationManager.nukeExistingCluster(kvClient, scope)) {
            return EtcdRegistrationManager.initNewCluster(kvClient, scope);
        }
        return false;
    }

    public boolean nukeExistingCluster() throws Exception {
        return EtcdRegistrationManager.nukeExistingCluster(this.kvClient, this.scope);
    }

    static boolean nukeExistingCluster(KV kvClient, String scope) throws Exception {
        ByteSequence rootScopeKey = ByteSequence.fromString((String)scope);
        GetResponse resp = (GetResponse)EtcdUtils.msResult(kvClient.get(rootScopeKey));
        if (resp.getCount() <= 0L) {
            log.info("There is no existing cluster with under scope '{}' in Etcd, so exiting nuke operation", (Object)scope);
            return true;
        }
        String bookiesPath = EtcdUtils.getBookiesPath(scope);
        String bookiesEndPath = EtcdUtils.getBookiesEndPath(scope);
        resp = (GetResponse)EtcdUtils.msResult(kvClient.get(ByteSequence.fromString((String)bookiesPath), GetOption.newBuilder().withRange(ByteSequence.fromString((String)bookiesEndPath)).withKeysOnly(true).build()));
        String writableBookiesPath = EtcdUtils.getWritableBookiesPath(scope);
        String readonlyBookiesPath = EtcdUtils.getReadonlyBookiesPath(scope);
        boolean hasBookiesAlive = false;
        for (KeyValue kv : resp.getKvs()) {
            String keyStr = new String(kv.getKey().getBytes(), StandardCharsets.UTF_8);
            if (keyStr.equals(bookiesPath) || keyStr.equals(writableBookiesPath) || keyStr.equals(readonlyBookiesPath)) continue;
            hasBookiesAlive = true;
            break;
        }
        if (hasBookiesAlive) {
            log.error("Bookies are still up and connected to this cluster, stop all bookies before nuking the cluster");
            return false;
        }
        DeleteResponse delResp = (DeleteResponse)EtcdUtils.msResult(kvClient.delete(rootScopeKey, DeleteOption.newBuilder().withRange(ByteSequence.fromString((String)EtcdUtils.getScopeEndKey(scope))).build()));
        log.info("Successfully nuked cluster under scope '{}' : {} kv pairs deleted", (Object)scope, (Object)delResp.getDeleted());
        return true;
    }

    Client getClient() {
        return this.client;
    }

    EtcdBookieRegister getBkRegister() {
        return this.bkRegister;
    }
}

