/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client;

import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.base.Preconditions;
import org.apache.pulsar.functions.runtime.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.ByteBufAllocator;
import org.apache.pulsar.functions.runtime.shaded.io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.pulsar.functions.runtime.shaded.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.HashedWheelTimer;
import org.apache.pulsar.functions.runtime.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.AsyncCallback;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookKeeperClientStats;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookieInfoReader;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookieWatcher;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.BookieWatcherImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ClientContext;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.ClientInternalConf;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.DefaultBookieAddressResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerCreateOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerDeleteOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.LedgerOpenOp;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.SyncCallbackUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.CreateBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.DeleteBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.LedgersIterator;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.ListLedgersResult;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.ListLedgersResultBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.OpenBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.allocator.ByteBufAllocatorBuilder;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.common.util.ReflectionUtils;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.feature.SettableFeatureProvider;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.CleanupLedgerManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerIdGenerator;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManager;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.LedgerManagerFactory;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MetadataClientDriver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.MetadataDrivers;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.meta.exceptions.MetadataException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.net.DNSToSwitchMapping;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieClient;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.BookieClientImpl;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.proto.DataFormats;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.EventLoopUtil;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.util.SafeRunnable;
import org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.functions.runtime.shaded.org.apache.commons.configuration.ConfigurationException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BookKeeper
implements org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.BookKeeper {
    private static final Logger LOG = LoggerFactory.getLogger(BookKeeper.class);
    final EventLoopGroup eventLoopGroup;
    private final ByteBufAllocator allocator;
    private final StatsLogger statsLogger;
    private final BookKeeperClientStats clientStats;
    private final double bookieQuarantineRatio;
    boolean ownEventLoopGroup = false;
    final BookieClient bookieClient;
    final BookieWatcherImpl bookieWatcher;
    final OrderedExecutor mainWorkerPool;
    final OrderedScheduler scheduler;
    final HashedWheelTimer requestTimer;
    final boolean ownTimer;
    final FeatureProvider featureProvider;
    final ScheduledExecutorService bookieInfoScheduler;
    final MetadataClientDriver metadataDriver;
    final LedgerManagerFactory ledgerManagerFactory;
    final LedgerManager ledgerManager;
    final LedgerIdGenerator ledgerIdGenerator;
    final EnsemblePlacementPolicy placementPolicy;
    BookieInfoReader bookieInfoReader;
    final ClientConfiguration conf;
    final ClientInternalConf internalConf;
    boolean closed = false;
    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final ClientContext clientCtx = new ClientContext(){

        @Override
        public ClientInternalConf getConf() {
            return BookKeeper.this.internalConf;
        }

        @Override
        public LedgerManager getLedgerManager() {
            return BookKeeper.this.getLedgerManager();
        }

        @Override
        public BookieWatcher getBookieWatcher() {
            return BookKeeper.this.getBookieWatcher();
        }

        @Override
        public EnsemblePlacementPolicy getPlacementPolicy() {
            return BookKeeper.this.getPlacementPolicy();
        }

        @Override
        public BookieClient getBookieClient() {
            return BookKeeper.this.getBookieClient();
        }

        @Override
        public OrderedExecutor getMainWorkerPool() {
            return BookKeeper.this.getMainWorkerPool();
        }

        @Override
        public OrderedScheduler getScheduler() {
            return BookKeeper.this.getScheduler();
        }

        @Override
        public BookKeeperClientStats getClientStats() {
            return BookKeeper.this.clientStats;
        }

        @Override
        public boolean isClientClosed() {
            return BookKeeper.this.isClosed();
        }

        @Override
        public ByteBufAllocator getByteBufAllocator() {
            return BookKeeper.this.allocator;
        }
    };

    public static Builder forConfig(ClientConfiguration conf) {
        return new Builder(conf);
    }

    public BookKeeper(String servers) throws IOException, InterruptedException, BKException {
        this((ClientConfiguration)new ClientConfiguration().setMetadataServiceUri("zk+null://" + servers + "/ledgers"));
    }

    public BookKeeper(ClientConfiguration conf) throws IOException, InterruptedException, BKException {
        this(conf, null, null, null, (StatsLogger)NullStatsLogger.INSTANCE, null, null, null);
    }

    private static ZooKeeper validateZooKeeper(ZooKeeper zk) throws NullPointerException, IOException {
        Preconditions.checkNotNull(zk, "No zookeeper instance provided");
        if (!zk.getState().isConnected()) {
            LOG.error("Unconnected zookeeper handle passed to bookkeeper");
            throw new IOException(KeeperException.create(KeeperException.Code.CONNECTIONLOSS));
        }
        return zk;
    }

    private static EventLoopGroup validateEventLoopGroup(EventLoopGroup eventLoopGroup) throws NullPointerException {
        Preconditions.checkNotNull(eventLoopGroup, "No Event Loop Group provided");
        return eventLoopGroup;
    }

    public BookKeeper(ClientConfiguration conf, ZooKeeper zk) throws IOException, InterruptedException, BKException {
        this(conf, BookKeeper.validateZooKeeper(zk), null, null, (StatsLogger)NullStatsLogger.INSTANCE, null, null, null);
    }

    public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLoopGroup) throws IOException, InterruptedException, BKException {
        this(conf, BookKeeper.validateZooKeeper(zk), BookKeeper.validateEventLoopGroup(eventLoopGroup), null, (StatsLogger)NullStatsLogger.INSTANCE, null, null, null);
    }

    @VisibleForTesting
    BookKeeper(ClientConfiguration conf, ZooKeeper zkc, EventLoopGroup eventLoopGroup, ByteBufAllocator byteBufAllocator, StatsLogger rootStatsLogger, DNSToSwitchMapping dnsResolver, HashedWheelTimer requestTimer, FeatureProvider featureProvider) throws IOException, InterruptedException, BKException {
        this.conf = conf;
        this.featureProvider = null == featureProvider ? SettableFeatureProvider.DISABLE_ALL : featureProvider;
        this.internalConf = ClientInternalConf.fromConfigAndFeatureProvider(conf, this.featureProvider);
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
        this.mainWorkerPool = OrderedExecutor.newBuilder().name("BookKeeperClientWorker").numThreads(conf.getNumWorkerThreads()).statsLogger(rootStatsLogger).traceTaskExecution(conf.getEnableTaskExecutionStats()).preserveMdcForTaskExecution(conf.getPreserveMdcForTaskExecution()).traceTaskWarnTimeMicroSec(conf.getTaskExecutionWarnTimeMicros()).enableBusyWait(conf.isBusyWaitEnabled()).build();
        this.statsLogger = rootStatsLogger.scope("bookkeeper_client");
        this.clientStats = BookKeeperClientStats.newInstance(this.statsLogger);
        try {
            String metadataServiceUriStr = conf.getMetadataServiceUri();
            if (null != metadataServiceUriStr) {
                this.metadataDriver = MetadataDrivers.getClientDriver(URI.create(metadataServiceUriStr));
            } else {
                Preconditions.checkNotNull(zkc, "No external zookeeper provided when no metadata service uri is found");
                this.metadataDriver = MetadataDrivers.getClientDriver("zk");
            }
            this.metadataDriver.initialize(conf, this.scheduler, rootStatsLogger, Optional.ofNullable(zkc));
        }
        catch (ConfigurationException ce) {
            LOG.error("Failed to initialize metadata client driver using invalid metadata service uri", (Throwable)ce);
            throw new IOException("Failed to initialize metadata client driver", ce);
        }
        catch (MetadataException me) {
            LOG.error("Encountered metadata exceptions on initializing metadata client driver", (Throwable)me);
            throw new IOException("Failed to initialize metadata client driver", me);
        }
        if (null == eventLoopGroup) {
            this.eventLoopGroup = EventLoopUtil.getClientEventLoopGroup(conf, new DefaultThreadFactory("bookkeeper-io"));
            this.ownEventLoopGroup = true;
        } else {
            this.eventLoopGroup = eventLoopGroup;
            this.ownEventLoopGroup = false;
        }
        this.allocator = byteBufAllocator != null ? byteBufAllocator : ByteBufAllocatorBuilder.create().poolingPolicy(conf.getAllocatorPoolingPolicy()).poolingConcurrency(conf.getAllocatorPoolingConcurrency()).outOfMemoryPolicy(conf.getAllocatorOutOfMemoryPolicy()).leakDetectionPolicy(conf.getAllocatorLeakDetectionPolicy()).build();
        if (null == requestTimer) {
            this.requestTimer = new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("BookieClientTimer-%d").build(), conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, conf.getTimeoutTimerNumTicks());
            this.ownTimer = true;
        } else {
            this.requestTimer = requestTimer;
            this.ownTimer = false;
        }
        DefaultBookieAddressResolver bookieAddressResolver = new DefaultBookieAddressResolver(this.metadataDriver.getRegistrationClient());
        if (dnsResolver != null) {
            dnsResolver.setBookieAddressResolver(bookieAddressResolver);
        }
        this.placementPolicy = this.initializeEnsemblePlacementPolicy(conf, dnsResolver, this.requestTimer, this.featureProvider, this.statsLogger, bookieAddressResolver);
        this.bookieWatcher = new BookieWatcherImpl(conf, this.placementPolicy, this.metadataDriver.getRegistrationClient(), bookieAddressResolver, this.statsLogger.scope("bookie_watcher"));
        this.bookieClient = new BookieClientImpl(conf, this.eventLoopGroup, this.allocator, this.mainWorkerPool, this.scheduler, rootStatsLogger, this.bookieWatcher.getBookieAddressResolver());
        if (conf.getDiskWeightBasedPlacementEnabled()) {
            LOG.info("Weighted ledger placement enabled");
            ThreadFactoryBuilder tFBuilder = new ThreadFactoryBuilder().setNameFormat("BKClientMetaDataPollScheduler-%d");
            this.bookieInfoScheduler = Executors.newSingleThreadScheduledExecutor(tFBuilder.build());
            this.bookieInfoReader = new BookieInfoReader(this, conf, this.bookieInfoScheduler);
            this.bookieWatcher.initialBlockingBookieRead();
            this.bookieInfoReader.start();
        } else {
            LOG.info("Weighted ledger placement is not enabled");
            this.bookieInfoScheduler = null;
            this.bookieInfoReader = new BookieInfoReader(this, conf, null);
            this.bookieWatcher.initialBlockingBookieRead();
        }
        try {
            this.ledgerManagerFactory = this.metadataDriver.getLedgerManagerFactory();
        }
        catch (MetadataException e) {
            throw new IOException("Failed to initialize ledger manager factory", e);
        }
        this.ledgerManager = new CleanupLedgerManager(this.ledgerManagerFactory.newLedgerManager());
        this.ledgerIdGenerator = this.ledgerManagerFactory.newLedgerIdGenerator();
        this.bookieQuarantineRatio = conf.getBookieQuarantineRatio();
        this.scheduleBookieHealthCheckIfEnabled(conf);
    }

    @VisibleForTesting
    BookKeeper() {
        this.conf = new ClientConfiguration();
        this.internalConf = ClientInternalConf.fromConfig(this.conf);
        this.statsLogger = NullStatsLogger.INSTANCE;
        this.clientStats = BookKeeperClientStats.newInstance(this.statsLogger);
        this.scheduler = null;
        this.requestTimer = null;
        this.metadataDriver = null;
        this.placementPolicy = null;
        this.ownTimer = false;
        this.mainWorkerPool = null;
        this.ledgerManagerFactory = null;
        this.ledgerManager = null;
        this.ledgerIdGenerator = null;
        this.featureProvider = null;
        this.eventLoopGroup = null;
        this.bookieWatcher = null;
        this.bookieInfoScheduler = null;
        this.bookieClient = null;
        this.allocator = UnpooledByteBufAllocator.DEFAULT;
        this.bookieQuarantineRatio = 1.0;
    }

    private EnsemblePlacementPolicy initializeEnsemblePlacementPolicy(ClientConfiguration conf, DNSToSwitchMapping dnsResolver, HashedWheelTimer timer, FeatureProvider featureProvider, StatsLogger statsLogger, BookieAddressResolver bookieAddressResolver) throws IOException {
        try {
            Class<? extends EnsemblePlacementPolicy> policyCls = conf.getEnsemblePlacementPolicy();
            return ReflectionUtils.newInstance(policyCls).initialize(conf, Optional.ofNullable(dnsResolver), timer, featureProvider, statsLogger, bookieAddressResolver);
        }
        catch (ConfigurationException e) {
            throw new IOException("Failed to initialize ensemble placement policy : ", e);
        }
    }

    int getReturnRc(int rc) {
        return BookKeeper.getReturnRc(this.bookieClient, rc);
    }

    static int getReturnRc(BookieClient bookieClient, int rc) {
        if (0 == rc) {
            return rc;
        }
        if (bookieClient.isClosed()) {
            return -19;
        }
        return rc;
    }

    void scheduleBookieHealthCheckIfEnabled(ClientConfiguration conf) {
        if (conf.isBookieHealthCheckEnabled()) {
            this.scheduler.scheduleAtFixedRate(new SafeRunnable(){

                @Override
                public void safeRun() {
                    BookKeeper.this.checkForFaultyBookies();
                }
            }, (long)conf.getBookieHealthCheckIntervalSeconds(), (long)conf.getBookieHealthCheckIntervalSeconds(), TimeUnit.SECONDS);
        }
    }

    void checkForFaultyBookies() {
        List<BookieId> faultyBookies = this.bookieClient.getFaultyBookies();
        for (BookieId faultyBookie : faultyBookies) {
            if (Math.random() <= this.bookieQuarantineRatio) {
                this.bookieWatcher.quarantineBookie(faultyBookie);
                this.statsLogger.getCounter("BOOKIE_QUARANTINE").inc();
                continue;
            }
            this.statsLogger.getCounter("BOOKIE_QUARANTINE_SKIP").inc();
        }
    }

    @VisibleForTesting
    public LedgerManager getLedgerManager() {
        return this.ledgerManager;
    }

    @VisibleForTesting
    public LedgerManagerFactory getLedgerManagerFactory() {
        return this.ledgerManagerFactory;
    }

    @VisibleForTesting
    LedgerManager getUnderlyingLedgerManager() {
        return ((CleanupLedgerManager)this.ledgerManager).getUnderlying();
    }

    @VisibleForTesting
    LedgerIdGenerator getLedgerIdGenerator() {
        return this.ledgerIdGenerator;
    }

    @VisibleForTesting
    ReentrantReadWriteLock getCloseLock() {
        return this.closeLock;
    }

    @VisibleForTesting
    boolean isClosed() {
        return this.closed;
    }

    @VisibleForTesting
    BookieWatcher getBookieWatcher() {
        return this.bookieWatcher;
    }

    public BookieAddressResolver getBookieAddressResolver() {
        return this.bookieWatcher.getBookieAddressResolver();
    }

    public OrderedExecutor getMainWorkerPool() {
        return this.mainWorkerPool;
    }

    @VisibleForTesting
    OrderedScheduler getScheduler() {
        return this.scheduler;
    }

    @VisibleForTesting
    EnsemblePlacementPolicy getPlacementPolicy() {
        return this.placementPolicy;
    }

    @VisibleForTesting
    public MetadataClientDriver getMetadataClientDriver() {
        return this.metadataDriver;
    }

    protected ClientConfiguration getConf() {
        return this.conf;
    }

    StatsLogger getStatsLogger() {
        return this.statsLogger;
    }

    BookieClient getBookieClient() {
        return this.bookieClient;
    }

    public Map<BookieId, BookieInfoReader.BookieInfo> getBookieInfo() throws BKException, InterruptedException {
        return this.bookieInfoReader.getBookieInfo();
    }

    public void asyncCreateLedger(int ensSize, int writeQuorumSize, DigestType digestType, byte[] passwd, AsyncCallback.CreateCallback cb, Object ctx) {
        this.asyncCreateLedger(ensSize, writeQuorumSize, writeQuorumSize, digestType, passwd, cb, ctx, Collections.emptyMap());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncCreateLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, AsyncCallback.CreateCallback cb, Object ctx, Map<String, byte[]> customMetadata) {
        if (writeQuorumSize < ackQuorumSize) {
            throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
        }
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                cb.createComplete(-19, null, ctx);
                return;
            }
            new LedgerCreateOp(this, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, cb, ctx, customMetadata, WriteFlag.NONE, this.clientStats).initiate();
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    public LedgerHandle createLedger(DigestType digestType, byte[] passwd) throws BKException, InterruptedException {
        return this.createLedger(3, 2, digestType, passwd);
    }

    public LedgerHandle createLedger(int ensSize, int qSize, DigestType digestType, byte[] passwd) throws InterruptedException, BKException {
        return this.createLedger(ensSize, qSize, qSize, digestType, passwd, Collections.emptyMap());
    }

    public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd) throws InterruptedException, BKException {
        return this.createLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, Collections.emptyMap());
    }

    public LedgerHandle createLedger(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, Map<String, byte[]> customMetadata) throws InterruptedException, BKException {
        CompletableFuture future = new CompletableFuture();
        SyncCallbackUtils.SyncCreateCallback result = new SyncCallbackUtils.SyncCreateCallback(future);
        this.asyncCreateLedger(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, result, null, customMetadata);
        LedgerHandle lh = (LedgerHandle)SyncCallbackUtils.waitForResult(future);
        if (lh == null) {
            LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
            throw BKException.create(-999);
        }
        return lh;
    }

    public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd) throws InterruptedException, BKException {
        return this.createLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, Collections.emptyMap());
    }

    public LedgerHandle createLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, Map<String, byte[]> customMetadata) throws InterruptedException, BKException {
        CompletableFuture future = new CompletableFuture();
        SyncCallbackUtils.SyncCreateAdvCallback result = new SyncCallbackUtils.SyncCreateAdvCallback(future);
        this.asyncCreateLedgerAdv(ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, result, null, customMetadata);
        LedgerHandle lh = (LedgerHandle)SyncCallbackUtils.waitForResult(future);
        if (lh == null) {
            LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
            throw BKException.create(-999);
        }
        return lh;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncCreateLedgerAdv(int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, AsyncCallback.CreateCallback cb, Object ctx, Map<String, byte[]> customMetadata) {
        if (writeQuorumSize < ackQuorumSize) {
            throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
        }
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                cb.createComplete(-19, null, ctx);
                return;
            }
            new LedgerCreateOp(this, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, cb, ctx, customMetadata, WriteFlag.NONE, this.clientStats).initiateAdv(-1L);
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    public LedgerHandle createLedgerAdv(long ledgerId, int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, Map<String, byte[]> customMetadata) throws InterruptedException, BKException {
        CompletableFuture future = new CompletableFuture();
        SyncCallbackUtils.SyncCreateAdvCallback result = new SyncCallbackUtils.SyncCreateAdvCallback(future);
        this.asyncCreateLedgerAdv(ledgerId, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, result, null, customMetadata);
        LedgerHandle lh = (LedgerHandle)SyncCallbackUtils.waitForResult(future);
        if (lh == null) {
            LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation");
            throw BKException.create(-999);
        }
        if (ledgerId != lh.getId()) {
            LOG.error("Unexpected condition : Expected ledgerId: {} but got: {}", (Object)ledgerId, (Object)lh.getId());
            throw BKException.create(-999);
        }
        LOG.info("Ensemble: {} for ledger: {}", lh.getLedgerMetadata().getEnsembleAt(0L), (Object)lh.getId());
        return lh;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncCreateLedgerAdv(long ledgerId, int ensSize, int writeQuorumSize, int ackQuorumSize, DigestType digestType, byte[] passwd, AsyncCallback.CreateCallback cb, Object ctx, Map<String, byte[]> customMetadata) {
        if (writeQuorumSize < ackQuorumSize) {
            throw new IllegalArgumentException("Write quorum must be larger than ack quorum");
        }
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                cb.createComplete(-19, null, ctx);
                return;
            }
            new LedgerCreateOp(this, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, cb, ctx, customMetadata, WriteFlag.NONE, this.clientStats).initiateAdv(ledgerId);
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncOpenLedger(long lId, DigestType digestType, byte[] passwd, AsyncCallback.OpenCallback cb, Object ctx) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                cb.openComplete(-19, null, ctx);
                return;
            }
            new LedgerOpenOp(this, this.clientStats, lId, digestType, passwd, cb, ctx).initiate();
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncOpenLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd, AsyncCallback.OpenCallback cb, Object ctx) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                cb.openComplete(-19, null, ctx);
                return;
            }
            new LedgerOpenOp(this, this.clientStats, lId, digestType, passwd, cb, ctx).initiateWithoutRecovery();
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    public LedgerHandle openLedger(long lId, DigestType digestType, byte[] passwd) throws BKException, InterruptedException {
        CompletableFuture future = new CompletableFuture();
        SyncCallbackUtils.SyncOpenCallback result = new SyncCallbackUtils.SyncOpenCallback(future);
        this.asyncOpenLedger(lId, digestType, passwd, result, null);
        return (LedgerHandle)SyncCallbackUtils.waitForResult(future);
    }

    public LedgerHandle openLedgerNoRecovery(long lId, DigestType digestType, byte[] passwd) throws BKException, InterruptedException {
        CompletableFuture future = new CompletableFuture();
        SyncCallbackUtils.SyncOpenCallback result = new SyncCallbackUtils.SyncOpenCallback(future);
        this.asyncOpenLedgerNoRecovery(lId, digestType, passwd, result, null);
        return (LedgerHandle)SyncCallbackUtils.waitForResult(future);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void asyncDeleteLedger(long lId, AsyncCallback.DeleteCallback cb, Object ctx) {
        this.closeLock.readLock().lock();
        try {
            if (this.closed) {
                cb.deleteComplete(-19, ctx);
                return;
            }
            new LedgerDeleteOp(this, this.clientStats, lId, cb, ctx).initiate();
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    public void deleteLedger(long lId) throws InterruptedException, BKException {
        CompletableFuture<Void> future = new CompletableFuture<Void>();
        SyncCallbackUtils.SyncDeleteCallback result = new SyncCallbackUtils.SyncDeleteCallback(future);
        this.asyncDeleteLedger(lId, result, null);
        SyncCallbackUtils.waitForResult(future);
    }

    public void asyncIsClosed(long lId, AsyncCallback.IsClosedCallback cb, Object ctx) {
        this.ledgerManager.readLedgerMetadata(lId).whenComplete((metadata, exception) -> {
            if (exception == null) {
                cb.isClosedComplete(0, ((LedgerMetadata)metadata.getValue()).isClosed(), ctx);
            } else {
                cb.isClosedComplete(BKException.getExceptionCode(exception), false, ctx);
            }
        });
    }

    public boolean isClosed(long lId) throws BKException, InterruptedException {
        final class Result {
            int rc;
            boolean isClosed;
            final CountDownLatch notifier = new CountDownLatch(1);

            Result() {
            }
        }
        final Result result = new Result();
        AsyncCallback.IsClosedCallback cb = new AsyncCallback.IsClosedCallback(){
            {
            }

            @Override
            public void isClosedComplete(int rc, boolean isClosed, Object ctx) {
                result.isClosed = isClosed;
                result.rc = rc;
                result.notifier.countDown();
            }
        };
        this.asyncIsClosed(lId, cb, null);
        result.notifier.await();
        if (result.rc != 0) {
            throw BKException.create(result.rc);
        }
        return result.isClosed;
    }

    @Override
    public void close() throws BKException, InterruptedException {
        this.closeLock.writeLock().lock();
        try {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        finally {
            this.closeLock.writeLock().unlock();
        }
        this.bookieClient.close();
        try {
            this.ledgerManager.close();
            this.ledgerIdGenerator.close();
        }
        catch (IOException ie) {
            LOG.error("Failed to close ledger manager : ", (Throwable)ie);
        }
        this.scheduler.shutdown();
        if (!this.scheduler.awaitTermination(10L, TimeUnit.SECONDS)) {
            LOG.warn("The scheduler did not shutdown cleanly");
        }
        this.mainWorkerPool.shutdown();
        if (!this.mainWorkerPool.awaitTermination(10L, TimeUnit.SECONDS)) {
            LOG.warn("The mainWorkerPool did not shutdown cleanly");
        }
        if (this.bookieInfoScheduler != null) {
            this.bookieInfoScheduler.shutdown();
            if (!this.bookieInfoScheduler.awaitTermination(10L, TimeUnit.SECONDS)) {
                LOG.warn("The bookieInfoScheduler did not shutdown cleanly");
            }
        }
        if (this.ownTimer) {
            this.requestTimer.stop();
        }
        if (this.ownEventLoopGroup) {
            this.eventLoopGroup.shutdownGracefully();
        }
        this.metadataDriver.close();
    }

    @Override
    public CreateBuilder newCreateLedgerOp() {
        return new LedgerCreateOp.CreateBuilderImpl(this);
    }

    @Override
    public OpenBuilder newOpenLedgerOp() {
        return new LedgerOpenOp.OpenBuilderImpl(this);
    }

    @Override
    public DeleteBuilder newDeleteLedgerOp() {
        return new LedgerDeleteOp.DeleteBuilderImpl(this);
    }

    @Override
    public ListLedgersResultBuilder newListLedgersOp() {
        return () -> {
            LedgerManager.LedgerRangeIterator iterator = this.getLedgerManager().getLedgerRanges(0L);
            return CompletableFuture.completedFuture(new ListLedgersResultImpl(iterator));
        };
    }

    @Override
    public CompletableFuture<LedgerMetadata> getLedgerMetadata(long ledgerId) {
        CompletableFuture<Versioned<LedgerMetadata>> versioned = this.getLedgerManager().readLedgerMetadata(ledgerId);
        return versioned.thenApply(versionedLedgerMetadata -> (LedgerMetadata)versionedLedgerMetadata.getValue());
    }

    ClientContext getClientCtx() {
        return this.clientCtx;
    }

    private static final class ListLedgersResultImpl
    implements ListLedgersResult {
        private final LedgerManager.LedgerRangeIterator iterator;
        private boolean closed = false;
        private LedgersIterator ledgersIterator;

        public ListLedgersResultImpl(LedgerManager.LedgerRangeIterator iterator) {
            this.iterator = iterator;
        }

        void checkClosed() {
            if (this.closed) {
                throw new IllegalStateException("ListLedgersResult is closed");
            }
        }

        private void initLedgersIterator() {
            if (this.ledgersIterator != null) {
                throw new IllegalStateException("LedgersIterator must be requested once");
            }
            this.ledgersIterator = new SyncLedgerIterator(this.iterator, this);
        }

        @Override
        public LedgersIterator iterator() {
            this.checkClosed();
            this.initLedgersIterator();
            return this.ledgersIterator;
        }

        @Override
        public Iterable<Long> toIterable() {
            this.checkClosed();
            this.initLedgersIterator();
            return () -> new Iterator<Long>(){

                @Override
                public boolean hasNext() {
                    try {
                        return ledgersIterator.hasNext();
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                }

                @Override
                public Long next() {
                    try {
                        return ledgersIterator.next();
                    }
                    catch (IOException ex) {
                        throw new RuntimeException(ex);
                    }
                }
            };
        }

        @Override
        public void close() throws Exception {
            this.closed = true;
        }
    }

    private static final class SyncLedgerIterator
    implements LedgersIterator {
        private final LedgerManager.LedgerRangeIterator iterator;
        private final ListLedgersResultImpl parent;
        Iterator<Long> currentRange = null;

        public SyncLedgerIterator(LedgerManager.LedgerRangeIterator iterator, ListLedgersResultImpl parent) {
            this.iterator = iterator;
            this.parent = parent;
        }

        @Override
        public boolean hasNext() throws IOException {
            this.parent.checkClosed();
            return this.currentRange != null ? this.currentRange.hasNext() : this.iterator.hasNext();
        }

        @Override
        public long next() throws IOException {
            this.parent.checkClosed();
            if (this.currentRange == null || !this.currentRange.hasNext()) {
                this.currentRange = this.iterator.next().getLedgers().iterator();
            }
            return this.currentRange.next();
        }
    }

    public static enum DigestType {
        MAC,
        CRC32,
        CRC32C,
        DUMMY;


        public static DigestType fromApiDigestType(org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.DigestType digestType) {
            switch (digestType) {
                case MAC: {
                    return MAC;
                }
                case CRC32: {
                    return CRC32;
                }
                case CRC32C: {
                    return CRC32C;
                }
                case DUMMY: {
                    return DUMMY;
                }
            }
            throw new IllegalArgumentException("Unable to convert digest type " + (Object)((Object)digestType));
        }

        public static DataFormats.LedgerMetadataFormat.DigestType toProtoDigestType(DigestType digestType) {
            switch (digestType) {
                case MAC: {
                    return DataFormats.LedgerMetadataFormat.DigestType.HMAC;
                }
                case CRC32: {
                    return DataFormats.LedgerMetadataFormat.DigestType.CRC32;
                }
                case CRC32C: {
                    return DataFormats.LedgerMetadataFormat.DigestType.CRC32C;
                }
                case DUMMY: {
                    return DataFormats.LedgerMetadataFormat.DigestType.DUMMY;
                }
            }
            throw new IllegalArgumentException("Unable to convert digest type " + (Object)((Object)digestType));
        }

        public org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.DigestType toApiDigestType() {
            switch (this) {
                case MAC: {
                    return org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.DigestType.MAC;
                }
                case CRC32: {
                    return org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.DigestType.CRC32;
                }
                case CRC32C: {
                    return org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.DigestType.CRC32C;
                }
                case DUMMY: {
                    return org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper.client.api.DigestType.DUMMY;
                }
            }
            throw new IllegalArgumentException("Unable to convert digest type " + (Object)((Object)this));
        }
    }

    public static class Builder {
        final ClientConfiguration conf;
        ZooKeeper zk = null;
        EventLoopGroup eventLoopGroup = null;
        ByteBufAllocator allocator = null;
        StatsLogger statsLogger = NullStatsLogger.INSTANCE;
        DNSToSwitchMapping dnsResolver = null;
        HashedWheelTimer requestTimer = null;
        FeatureProvider featureProvider = null;

        Builder(ClientConfiguration conf) {
            this.conf = conf;
        }

        @Deprecated
        public Builder setEventLoopGroup(EventLoopGroup f) {
            this.eventLoopGroup = f;
            return this;
        }

        @Deprecated
        public Builder setZookeeper(ZooKeeper zk) {
            this.zk = zk;
            return this;
        }

        @Deprecated
        public Builder setStatsLogger(StatsLogger statsLogger) {
            this.statsLogger = statsLogger;
            return this;
        }

        public Builder eventLoopGroup(EventLoopGroup f) {
            this.eventLoopGroup = f;
            return this;
        }

        public Builder allocator(ByteBufAllocator allocator) {
            this.allocator = allocator;
            return this;
        }

        @Deprecated
        public Builder zk(ZooKeeper zk) {
            this.zk = zk;
            return this;
        }

        public Builder statsLogger(StatsLogger statsLogger) {
            this.statsLogger = statsLogger;
            return this;
        }

        public Builder dnsResolver(DNSToSwitchMapping dnsResolver) {
            this.dnsResolver = dnsResolver;
            return this;
        }

        public Builder requestTimer(HashedWheelTimer requestTimer) {
            this.requestTimer = requestTimer;
            return this;
        }

        public Builder featureProvider(FeatureProvider featureProvider) {
            this.featureProvider = featureProvider;
            return this;
        }

        public BookKeeper build() throws IOException, InterruptedException, BKException {
            Preconditions.checkNotNull(this.statsLogger, "No stats logger provided");
            return new BookKeeper(this.conf, this.zk, this.eventLoopGroup, this.allocator, this.statsLogger, this.dnsResolver, this.requestTimer, this.featureProvider);
        }
    }
}

