/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.proto;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.security.cert.Certificate;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.BiPredicate;
import javax.net.ssl.SSLException;
import javax.net.ssl.SSLPeerUnverifiedException;
import org.apache.pulsar.shade.com.google.common.base.Joiner;
import org.apache.pulsar.shade.com.google.common.collect.Sets;
import org.apache.pulsar.shade.com.google.protobuf.ByteString;
import org.apache.pulsar.shade.com.google.protobuf.ExtensionRegistry;
import org.apache.pulsar.shade.com.google.protobuf.UnsafeByteOperations;
import org.apache.pulsar.shade.io.netty.bootstrap.Bootstrap;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.ByteBufAllocator;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.pulsar.shade.io.netty.channel.Channel;
import org.apache.pulsar.shade.io.netty.channel.ChannelFuture;
import org.apache.pulsar.shade.io.netty.channel.ChannelFutureListener;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandler;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.pulsar.shade.io.netty.channel.ChannelInitializer;
import org.apache.pulsar.shade.io.netty.channel.ChannelOption;
import org.apache.pulsar.shade.io.netty.channel.ChannelPipeline;
import org.apache.pulsar.shade.io.netty.channel.ChannelPromise;
import org.apache.pulsar.shade.io.netty.channel.DefaultEventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.EventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.WriteBufferWaterMark;
import org.apache.pulsar.shade.io.netty.channel.epoll.EpollEventLoopGroup;
import org.apache.pulsar.shade.io.netty.channel.epoll.EpollSocketChannel;
import org.apache.pulsar.shade.io.netty.channel.local.LocalAddress;
import org.apache.pulsar.shade.io.netty.channel.local.LocalChannel;
import org.apache.pulsar.shade.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.pulsar.shade.io.netty.channel.unix.Errors;
import org.apache.pulsar.shade.io.netty.handler.codec.CorruptedFrameException;
import org.apache.pulsar.shade.io.netty.handler.codec.DecoderException;
import org.apache.pulsar.shade.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.pulsar.shade.io.netty.handler.codec.TooLongFrameException;
import org.apache.pulsar.shade.io.netty.handler.flush.FlushConsolidationHandler;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslHandler;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.io.netty.util.concurrent.Future;
import org.apache.pulsar.shade.io.netty.util.concurrent.GenericFutureListener;
import org.apache.pulsar.shade.org.apache.bookkeeper.auth.BookKeeperPrincipal;
import org.apache.pulsar.shade.org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookieInfoReader;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.MdcUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.BookieId;
import org.apache.pulsar.shade.org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.AuthHandler;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookieProtoEncoding;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.BookkeeperProtocol;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.ClientConnectionPeer;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.LocalBookiesRegistry;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.PerChannelBookieClientPool;
import org.apache.pulsar.shade.org.apache.bookkeeper.proto.ReadLastConfirmedAndEntryContext;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.Counter;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.StatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.annotations.StatsDoc;
import org.apache.pulsar.shade.org.apache.bookkeeper.tls.SecurityException;
import org.apache.pulsar.shade.org.apache.bookkeeper.tls.SecurityHandlerFactory;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.AvailabilityOfEntriesOfLedger;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ByteBufList;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.MathUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.StringUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.collections.SynchronizedHashMultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

@StatsDoc(name="per_channel_bookie_client", help="Per channel bookie client stats")
@ChannelHandler.Sharable
public class PerChannelBookieClient
extends ChannelInboundHandlerAdapter {
    static final Logger LOG = LoggerFactory.getLogger(PerChannelBookieClient.class);
    private static final Set<Integer> expectedBkOperationErrors = Collections.unmodifiableSet(Sets.newHashSet(-8, -13, -7, -101, -20, -22, -104));
    private static final int DEFAULT_HIGH_PRIORITY_VALUE = 100;
    private static final AtomicLong txnIdGenerator = new AtomicLong(0L);
    final BookieId bookieId;
    final BookieAddressResolver bookieAddressResolver;
    final EventLoopGroup eventLoopGroup;
    final ByteBufAllocator allocator;
    final OrderedExecutor executor;
    final long addEntryTimeoutNanos;
    final long readEntryTimeoutNanos;
    final int maxFrameSize;
    final long getBookieInfoTimeoutNanos;
    final int startTLSTimeout;
    private final ConcurrentOpenHashMap<CompletionKey, CompletionValue> completionObjects = new ConcurrentOpenHashMap();
    private final SynchronizedHashMultiMap<CompletionKey, CompletionValue> completionObjectsV2Conflicts = new SynchronizedHashMultiMap();
    private final StatsLogger statsLogger;
    @StatsDoc(name="READ_ENTRY", help="channel stats of read entries requests")
    private final OpStatsLogger readEntryOpLogger;
    @StatsDoc(name="TIMEOUT_READ_ENTRY", help="timeout stats of read entries requests")
    private final OpStatsLogger readTimeoutOpLogger;
    @StatsDoc(name="ADD_ENTRY", help="channel stats of add entries requests")
    private final OpStatsLogger addEntryOpLogger;
    @StatsDoc(name="WRITE_LAC", help="channel stats of write_lac requests")
    private final OpStatsLogger writeLacOpLogger;
    @StatsDoc(name="FORCE", help="channel stats of force requests")
    private final OpStatsLogger forceLedgerOpLogger;
    @StatsDoc(name="READ_LAC", help="channel stats of read_lac requests")
    private final OpStatsLogger readLacOpLogger;
    @StatsDoc(name="TIMEOUT_ADD_ENTRY", help="timeout stats of add entries requests")
    private final OpStatsLogger addTimeoutOpLogger;
    @StatsDoc(name="TIMEOUT_WRITE_LAC", help="timeout stats of write_lac requests")
    private final OpStatsLogger writeLacTimeoutOpLogger;
    @StatsDoc(name="TIMEOUT_FORCE", help="timeout stats of force requests")
    private final OpStatsLogger forceLedgerTimeoutOpLogger;
    @StatsDoc(name="TIMEOUT_READ_LAC", help="timeout stats of read_lac requests")
    private final OpStatsLogger readLacTimeoutOpLogger;
    @StatsDoc(name="GET_BOOKIE_INFO", help="channel stats of get_bookie_info requests")
    private final OpStatsLogger getBookieInfoOpLogger;
    @StatsDoc(name="TIMEOUT_GET_BOOKIE_INFO", help="timeout stats of get_bookie_info requests")
    private final OpStatsLogger getBookieInfoTimeoutOpLogger;
    @StatsDoc(name="START_TLS", help="channel stats of start_tls requests")
    private final OpStatsLogger startTLSOpLogger;
    @StatsDoc(name="TIMEOUT_START_TLS", help="timeout stats of start_tls requests")
    private final OpStatsLogger startTLSTimeoutOpLogger;
    @StatsDoc(name="CLIENT_CONNECT_TIMER", help="channel stats of connect requests")
    private final OpStatsLogger connectTimer;
    private final OpStatsLogger getListOfEntriesOfLedgerCompletionOpLogger;
    private final OpStatsLogger getListOfEntriesOfLedgerCompletionTimeoutOpLogger;
    @StatsDoc(name="NETTY_EXCEPTION_CNT", help="the number of exceptions received from this channel")
    private final Counter exceptionCounter;
    @StatsDoc(name="ADD_OP_OUTSTANDING", help="the number of outstanding add_entry requests")
    private final Counter addEntryOutstanding;
    @StatsDoc(name="READ_OP_OUTSTANDING", help="the number of outstanding add_entry requests")
    private final Counter readEntryOutstanding;
    @StatsDoc(name="NETTY_OPS", help="channel stats for all operations flowing through netty pipeline")
    private final OpStatsLogger nettyOpLogger;
    @StatsDoc(name="ACTIVE_NON_TLS_CHANNEL_COUNTER", help="the number of active non-tls channels")
    private final Counter activeNonTlsChannelCounter;
    @StatsDoc(name="ACTIVE_TLS_CHANNEL_COUNTER", help="the number of active tls channels")
    private final Counter activeTlsChannelCounter;
    @StatsDoc(name="FAILED_CONNECTION_COUNTER", help="the number of failed connections")
    private final Counter failedConnectionCounter;
    @StatsDoc(name="FAILED_TLS_HANDSHAKE_COUNTER", help="the number of failed tls handshakes")
    private final Counter failedTlsHandshakeCounter;
    private final boolean useV2WireProtocol;
    private final boolean preserveMdcForTaskExecution;
    private volatile Queue<BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>> pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>>();
    volatile Channel channel = null;
    private final ClientConnectionPeer connectionPeer;
    private volatile BookKeeperPrincipal authorizedId = BookKeeperPrincipal.ANONYMOUS;
    volatile ConnectionState state;
    final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final ClientConfiguration conf;
    private final PerChannelBookieClientPool pcbcPool;
    private final ClientAuthProvider.Factory authProviderFactory;
    private final ExtensionRegistry extRegistry;
    private final SecurityHandlerFactory shFactory;
    private volatile boolean isWritable = true;
    private long lastBookieUnavailableLogTimestamp = 0L;
    private static final BiPredicate<CompletionKey, CompletionValue> timeoutCheck = (key, value) -> value.maybeTimeout();
    private final Recycler<AddCompletion> addCompletionRecycler = new Recycler<AddCompletion>(){

        @Override
        protected AddCompletion newObject(Recycler.Handle<AddCompletion> handle) {
            return new AddCompletion(handle);
        }
    };
    private final Recycler<V2CompletionKey> v2KeyRecycler = new Recycler<V2CompletionKey>(){

        @Override
        protected V2CompletionKey newObject(Recycler.Handle<V2CompletionKey> handle) {
            return new V2CompletionKey(handle);
        }
    };

    private FailedChannelFutureImpl processBookieNotResolvedError(long startTime, BookieAddressResolver.BookieIdNotResolvedException err) {
        FailedChannelFutureImpl failedFuture = new FailedChannelFutureImpl(err);
        this.contextPreservingListener(new ConnectionFutureListener(startTime)).operationComplete(failedFuture);
        return failedFuture;
    }

    public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieId addr, BookieAddressResolver bookieAddressResolver) throws SecurityException {
        this(new ClientConfiguration(), executor, eventLoopGroup, addr, NullStatsLogger.INSTANCE, null, null, null, bookieAddressResolver);
    }

    public PerChannelBookieClient(OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieId bookieId, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, BookieAddressResolver bookieAddressResolver) throws SecurityException {
        this(new ClientConfiguration(), executor, eventLoopGroup, bookieId, NullStatsLogger.INSTANCE, authProviderFactory, extRegistry, null, bookieAddressResolver);
    }

    public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor, EventLoopGroup eventLoopGroup, BookieId bookieId, StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, PerChannelBookieClientPool pcbcPool, BookieAddressResolver bookieAddressResolver) throws SecurityException {
        this(conf, executor, eventLoopGroup, UnpooledByteBufAllocator.DEFAULT, bookieId, NullStatsLogger.INSTANCE, authProviderFactory, extRegistry, pcbcPool, null, bookieAddressResolver);
    }

    public PerChannelBookieClient(ClientConfiguration conf, OrderedExecutor executor, EventLoopGroup eventLoopGroup, ByteBufAllocator allocator, BookieId bookieId, StatsLogger parentStatsLogger, ClientAuthProvider.Factory authProviderFactory, ExtensionRegistry extRegistry, PerChannelBookieClientPool pcbcPool, SecurityHandlerFactory shFactory, BookieAddressResolver bookieAddressResolver) throws SecurityException {
        this.maxFrameSize = conf.getNettyMaxFrameSizeBytes();
        this.conf = conf;
        this.bookieId = bookieId;
        this.bookieAddressResolver = bookieAddressResolver;
        this.executor = executor;
        this.eventLoopGroup = LocalBookiesRegistry.isLocalBookie(bookieId) ? new DefaultEventLoopGroup() : eventLoopGroup;
        this.allocator = allocator;
        this.state = ConnectionState.DISCONNECTED;
        this.addEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getAddEntryTimeout());
        this.readEntryTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getReadEntryTimeout());
        this.getBookieInfoTimeoutNanos = TimeUnit.SECONDS.toNanos(conf.getBookieInfoTimeout());
        this.startTLSTimeout = conf.getStartTLSTimeout();
        this.useV2WireProtocol = conf.getUseV2WireProtocol();
        this.preserveMdcForTaskExecution = conf.getPreserveMdcForTaskExecution();
        this.authProviderFactory = authProviderFactory;
        this.extRegistry = extRegistry;
        this.shFactory = shFactory;
        if (shFactory != null) {
            shFactory.init(SecurityHandlerFactory.NodeType.Client, conf, allocator);
        }
        this.statsLogger = parentStatsLogger.scope("per_channel_bookie_client").scopeLabel("bookie", bookieId.toString());
        this.readEntryOpLogger = this.statsLogger.getOpStatsLogger("READ_ENTRY");
        this.addEntryOpLogger = this.statsLogger.getOpStatsLogger("ADD_ENTRY");
        this.writeLacOpLogger = this.statsLogger.getOpStatsLogger("WRITE_LAC");
        this.forceLedgerOpLogger = this.statsLogger.getOpStatsLogger("FORCE");
        this.readLacOpLogger = this.statsLogger.getOpStatsLogger("READ_LAC");
        this.getBookieInfoOpLogger = this.statsLogger.getOpStatsLogger("GET_BOOKIE_INFO");
        this.getListOfEntriesOfLedgerCompletionOpLogger = this.statsLogger.getOpStatsLogger("GET_LIST_OF_ENTRIES_OF_LEDGER");
        this.readTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_READ_ENTRY");
        this.addTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_ADD_ENTRY");
        this.writeLacTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_WRITE_LAC");
        this.forceLedgerTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_FORCE");
        this.readLacTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_READ_LAC");
        this.getBookieInfoTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_GET_BOOKIE_INFO");
        this.startTLSOpLogger = this.statsLogger.getOpStatsLogger("START_TLS");
        this.startTLSTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_START_TLS");
        this.getListOfEntriesOfLedgerCompletionTimeoutOpLogger = this.statsLogger.getOpStatsLogger("TIMEOUT_GET_LIST_OF_ENTRIES_OF_LEDGER");
        this.exceptionCounter = this.statsLogger.getCounter("NETTY_EXCEPTION_CNT");
        this.connectTimer = this.statsLogger.getOpStatsLogger("CLIENT_CONNECT_TIMER");
        this.addEntryOutstanding = this.statsLogger.getCounter("ADD_OP_OUTSTANDING");
        this.readEntryOutstanding = this.statsLogger.getCounter("READ_OP_OUTSTANDING");
        this.nettyOpLogger = this.statsLogger.getOpStatsLogger("NETTY_OPS");
        this.activeNonTlsChannelCounter = this.statsLogger.getCounter("ACTIVE_NON_TLS_CHANNEL_COUNTER");
        this.activeTlsChannelCounter = this.statsLogger.getCounter("ACTIVE_TLS_CHANNEL_COUNTER");
        this.failedConnectionCounter = this.statsLogger.getCounter("FAILED_CONNECTION_COUNTER");
        this.failedTlsHandshakeCounter = this.statsLogger.getCounter("FAILED_TLS_HANDSHAKE_COUNTER");
        this.pcbcPool = pcbcPool;
        this.connectionPeer = new ClientConnectionPeer(){

            @Override
            public SocketAddress getRemoteAddr() {
                Channel c = PerChannelBookieClient.this.channel;
                if (c != null) {
                    return c.remoteAddress();
                }
                return null;
            }

            @Override
            public Collection<Object> getProtocolPrincipals() {
                Channel c = PerChannelBookieClient.this.channel;
                if (c == null) {
                    return Collections.emptyList();
                }
                SslHandler ssl = c.pipeline().get(SslHandler.class);
                if (ssl == null) {
                    return Collections.emptyList();
                }
                try {
                    Certificate[] certificates = ssl.engine().getSession().getPeerCertificates();
                    if (certificates == null) {
                        return Collections.emptyList();
                    }
                    ArrayList<Object> result = new ArrayList<Object>();
                    result.addAll(Arrays.asList(certificates));
                    return result;
                }
                catch (SSLPeerUnverifiedException err) {
                    return Collections.emptyList();
                }
            }

            @Override
            public void disconnect() {
                Channel c = PerChannelBookieClient.this.channel;
                if (c != null) {
                    c.close().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)x -> PerChannelBookieClient.this.makeWritable()));
                }
                LOG.info("authplugin disconnected channel {}", (Object)PerChannelBookieClient.this.channel);
            }

            @Override
            public void setAuthorizedId(BookKeeperPrincipal principal) {
                PerChannelBookieClient.this.authorizedId = principal;
                LOG.info("connection {} authenticated as {}", (Object)PerChannelBookieClient.this.channel, (Object)principal);
            }

            @Override
            public BookKeeperPrincipal getAuthorizedId() {
                return PerChannelBookieClient.this.authorizedId;
            }

            @Override
            public boolean isSecure() {
                Channel c = PerChannelBookieClient.this.channel;
                if (c == null) {
                    return false;
                }
                return c.pipeline().get(SslHandler.class) != null;
            }
        };
    }

    private void completeOperation(BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> op, int rc) {
        this.closeLock.readLock().lock();
        try {
            if (ConnectionState.CLOSED == this.state) {
                op.operationComplete(-19, this);
            } else {
                op.operationComplete(rc, this);
            }
        }
        finally {
            this.closeLock.readLock().unlock();
        }
    }

    protected long getNumPendingCompletionRequests() {
        return this.completionObjects.size();
    }

    protected ChannelFuture connect() {
        BookieSocketAddress addr;
        long startTime = MathUtils.nowInNano();
        if (LOG.isDebugEnabled()) {
            LOG.debug("Connecting to bookie: {}", (Object)this.bookieId);
        }
        try {
            addr = this.bookieAddressResolver.resolve(this.bookieId);
        }
        catch (BookieAddressResolver.BookieIdNotResolvedException err) {
            LOG.error("Cannot connect to {} as endpoint resolution failed (probably bookie is down) err {}", (Object)this.bookieId, (Object)err.toString());
            return this.processBookieNotResolvedError(startTime, err);
        }
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(this.eventLoopGroup);
        if (this.eventLoopGroup instanceof EpollEventLoopGroup) {
            bootstrap.channel(EpollSocketChannel.class);
        } else if (this.eventLoopGroup instanceof DefaultEventLoopGroup) {
            bootstrap.channel(LocalChannel.class);
        } else {
            bootstrap.channel(NioSocketChannel.class);
        }
        bootstrap.option(ChannelOption.ALLOCATOR, this.allocator);
        bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, this.conf.getClientConnectTimeoutMillis());
        bootstrap.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(this.conf.getClientWriteBufferLowWaterMark(), this.conf.getClientWriteBufferHighWaterMark()));
        if (!(this.eventLoopGroup instanceof DefaultEventLoopGroup)) {
            bootstrap.option(ChannelOption.TCP_NODELAY, this.conf.getClientTcpNoDelay());
            bootstrap.option(ChannelOption.SO_KEEPALIVE, this.conf.getClientSockKeepalive());
            if (this.conf.getClientSendBufferSize() > 0) {
                bootstrap.option(ChannelOption.SO_SNDBUF, this.conf.getClientSendBufferSize());
            }
            if (this.conf.getClientReceiveBufferSize() > 0) {
                bootstrap.option(ChannelOption.SO_RCVBUF, this.conf.getClientReceiveBufferSize());
            }
        }
        bootstrap.handler(new ChannelInitializer<Channel>(){

            @Override
            protected void initChannel(Channel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                pipeline.addLast("consolidation", (ChannelHandler)new FlushConsolidationHandler(1024, true));
                pipeline.addLast("bytebufList", (ChannelHandler)ByteBufList.ENCODER);
                pipeline.addLast("lengthbasedframedecoder", (ChannelHandler)new LengthFieldBasedFrameDecoder(PerChannelBookieClient.this.maxFrameSize, 0, 4, 0, 4));
                pipeline.addLast("bookieProtoEncoder", (ChannelHandler)new BookieProtoEncoding.RequestEncoder(PerChannelBookieClient.this.extRegistry));
                pipeline.addLast("bookieProtoDecoder", (ChannelHandler)new BookieProtoEncoding.ResponseDecoder(PerChannelBookieClient.this.extRegistry, PerChannelBookieClient.this.useV2WireProtocol, PerChannelBookieClient.this.shFactory != null));
                pipeline.addLast("authHandler", (ChannelHandler)new AuthHandler.ClientSideHandler(PerChannelBookieClient.this.authProviderFactory, txnIdGenerator, PerChannelBookieClient.this.connectionPeer, PerChannelBookieClient.this.useV2WireProtocol));
                pipeline.addLast("mainhandler", (ChannelHandler)PerChannelBookieClient.this);
            }
        });
        SocketAddress bookieAddr = addr.getSocketAddress();
        if (this.eventLoopGroup instanceof DefaultEventLoopGroup) {
            bookieAddr = new LocalAddress(this.bookieId.toString());
        }
        ChannelFuture future = bootstrap.connect(bookieAddr);
        future.addListener(this.contextPreservingListener(new ConnectionFutureListener(startTime)));
        future.addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)x -> this.makeWritable()));
        return future;
    }

    void cleanDisconnectAndClose() {
        this.disconnect();
        this.close();
    }

    public boolean isWritable() {
        return this.isWritable;
    }

    public void setWritable(boolean val) {
        this.isWritable = val;
    }

    private void makeWritable() {
        this.setWritable(true);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void connectIfNeededAndDoOp(BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient> op) {
        boolean completeOpNow = false;
        int opRc = 0;
        if (this.channel != null && this.state == ConnectionState.CONNECTED) {
            completeOpNow = true;
        } else {
            PerChannelBookieClient perChannelBookieClient = this;
            synchronized (perChannelBookieClient) {
                if (this.channel != null && this.state == ConnectionState.CONNECTED) {
                    completeOpNow = true;
                    opRc = 0;
                } else if (this.state == ConnectionState.CLOSED) {
                    completeOpNow = true;
                    opRc = -8;
                } else {
                    this.pendingOps.add(op);
                    if (this.state == ConnectionState.CONNECTING || this.state == ConnectionState.START_TLS) {
                        return;
                    }
                    this.state = ConnectionState.CONNECTING;
                }
            }
            if (!completeOpNow) {
                this.connect();
            }
        }
        if (completeOpNow) {
            this.completeOperation(op, opRc);
        }
    }

    void writeLac(long ledgerId, byte[] masterKey, long lac, ByteBufList toSend, BookkeeperInternalCallbacks.WriteLacCallback cb, Object ctx) {
        long txnId = this.getTxnId();
        V3CompletionKey completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.WRITE_LAC);
        this.completionObjects.put(completionKey, new WriteLacCompletion(completionKey, cb, ctx, ledgerId));
        BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.WRITE_LAC).setTxnId(txnId);
        ByteString body = toSend.hasArray() ? UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes()) : (toSend.size() == 1 ? UnsafeByteOperations.unsafeWrap(toSend.getBuffer(0).nioBuffer()) : UnsafeByteOperations.unsafeWrap(toSend.toArray()));
        BookkeeperProtocol.WriteLacRequest.Builder writeLacBuilder = BookkeeperProtocol.WriteLacRequest.newBuilder().setLedgerId(ledgerId).setLac(lac).setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey)).setBody(body);
        BookkeeperProtocol.Request writeLacRequest = this.withRequestContext(BookkeeperProtocol.Request.newBuilder()).setHeader(headerBuilder).setWriteLacRequest(writeLacBuilder).build();
        this.writeAndFlush(this.channel, completionKey, writeLacRequest);
    }

    void forceLedger(long ledgerId, BookkeeperInternalCallbacks.ForceLedgerCallback cb, Object ctx) {
        if (this.useV2WireProtocol) {
            LOG.error("force is not allowed with v2 protocol");
            this.executor.executeOrdered(ledgerId, () -> cb.forceLedgerComplete(-100, ledgerId, this.bookieId, ctx));
            return;
        }
        long txnId = this.getTxnId();
        V3CompletionKey completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.FORCE_LEDGER);
        this.completionObjects.put(completionKey, new ForceLedgerCompletion(completionKey, cb, ctx, ledgerId));
        BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.FORCE_LEDGER).setTxnId(txnId);
        BookkeeperProtocol.ForceLedgerRequest.Builder writeLacBuilder = BookkeeperProtocol.ForceLedgerRequest.newBuilder().setLedgerId(ledgerId);
        BookkeeperProtocol.Request forceLedgerRequest = this.withRequestContext(BookkeeperProtocol.Request.newBuilder()).setHeader(headerBuilder).setForceLedgerRequest(writeLacBuilder).build();
        this.writeAndFlush(this.channel, completionKey, forceLedgerRequest);
    }

    void addEntry(long ledgerId, byte[] masterKey, long entryId, ByteBufList toSend, BookkeeperInternalCallbacks.WriteCallback cb, Object ctx, int options, boolean allowFastFail, EnumSet<WriteFlag> writeFlags) {
        Object request = null;
        CompletionKey completionKey = null;
        if (this.useV2WireProtocol) {
            if (writeFlags.contains((Object)WriteFlag.DEFERRED_SYNC)) {
                LOG.error("invalid writeflags {} for v2 protocol", writeFlags);
                this.executor.executeOrdered(ledgerId, () -> cb.writeComplete(-100, ledgerId, entryId, this.bookieId, ctx));
                return;
            }
            completionKey = this.acquireV2Key(ledgerId, entryId, BookkeeperProtocol.OperationType.ADD_ENTRY);
            request = BookieProtocol.AddRequest.create((byte)2, ledgerId, entryId, (short)options, masterKey, toSend);
        } else {
            long txnId = this.getTxnId();
            completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.ADD_ENTRY);
            BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.ADD_ENTRY).setTxnId(txnId);
            if (((short)options & 4) == 4) {
                headerBuilder.setPriority(100);
            }
            ByteString body = null;
            if (toSend.hasArray()) {
                body = UnsafeByteOperations.unsafeWrap(toSend.array(), toSend.arrayOffset(), toSend.readableBytes());
            } else {
                for (int i = 0; i < toSend.size(); ++i) {
                    ByteString piece = UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
                    body = body == null ? piece : body.concat(piece);
                }
            }
            BookkeeperProtocol.AddRequest.Builder addBuilder = BookkeeperProtocol.AddRequest.newBuilder().setLedgerId(ledgerId).setEntryId(entryId).setMasterKey(UnsafeByteOperations.unsafeWrap(masterKey)).setBody(body);
            if (((short)options & 2) == 2) {
                addBuilder.setFlag(BookkeeperProtocol.AddRequest.Flag.RECOVERY_ADD);
            }
            if (!writeFlags.isEmpty()) {
                addBuilder.setWriteFlags(WriteFlag.getWriteFlagsValue(writeFlags));
            }
            request = this.withRequestContext(BookkeeperProtocol.Request.newBuilder()).setHeader(headerBuilder).setAddRequest(addBuilder).build();
        }
        this.putCompletionKeyValue(completionKey, this.acquireAddCompletion(completionKey, cb, ctx, ledgerId, entryId));
        Channel c = this.channel;
        if (c == null) {
            this.errorOut(completionKey);
            toSend.release();
            return;
        }
        this.writeAndFlush(c, completionKey, request, allowFastFail);
    }

    public void readLac(long ledgerId, BookkeeperInternalCallbacks.ReadLacCallback cb, Object ctx) {
        Object request = null;
        CompletionKey completionKey = null;
        if (this.useV2WireProtocol) {
            request = new BookieProtocol.ReadRequest(2, ledgerId, 0L, 0, null);
            completionKey = this.acquireV2Key(ledgerId, 0L, BookkeeperProtocol.OperationType.READ_LAC);
        } else {
            long txnId = this.getTxnId();
            completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.READ_LAC);
            BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.READ_LAC).setTxnId(txnId);
            BookkeeperProtocol.ReadLacRequest.Builder readLacBuilder = BookkeeperProtocol.ReadLacRequest.newBuilder().setLedgerId(ledgerId);
            request = this.withRequestContext(BookkeeperProtocol.Request.newBuilder()).setHeader(headerBuilder).setReadLacRequest(readLacBuilder).build();
        }
        this.putCompletionKeyValue(completionKey, new ReadLacCompletion(completionKey, cb, ctx, ledgerId));
        this.writeAndFlush(this.channel, completionKey, request);
    }

    public void getListOfEntriesOfLedger(long ledgerId, BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback cb) {
        long txnId = this.getTxnId();
        V3CompletionKey completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER);
        this.completionObjects.put(completionKey, new GetListOfEntriesOfLedgerCompletion(completionKey, cb, ledgerId));
        BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.GET_LIST_OF_ENTRIES_OF_LEDGER).setTxnId(txnId);
        BookkeeperProtocol.GetListOfEntriesOfLedgerRequest.Builder getListOfEntriesOfLedgerRequestBuilder = BookkeeperProtocol.GetListOfEntriesOfLedgerRequest.newBuilder().setLedgerId(ledgerId);
        BookkeeperProtocol.Request getListOfEntriesOfLedgerRequest = BookkeeperProtocol.Request.newBuilder().setHeader(headerBuilder).setGetListOfEntriesOfLedgerRequest(getListOfEntriesOfLedgerRequestBuilder).build();
        this.writeAndFlush(this.channel, completionKey, getListOfEntriesOfLedgerRequest);
    }

    public void readEntryWaitForLACUpdate(long ledgerId, long entryId, long previousLAC, long timeOutInMillis, boolean piggyBackEntry, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx) {
        this.readEntryInternal(ledgerId, entryId, previousLAC, timeOutInMillis, piggyBackEntry, cb, ctx, 0, null, false);
    }

    public void readEntry(long ledgerId, long entryId, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey, boolean allowFastFail) {
        this.readEntryInternal(ledgerId, entryId, null, null, false, cb, ctx, (short)flags, masterKey, allowFastFail);
    }

    private void readEntryInternal(long ledgerId, long entryId, Long previousLAC, Long timeOutInMillis, boolean piggyBackEntry, BookkeeperInternalCallbacks.ReadEntryCallback cb, Object ctx, int flags, byte[] masterKey, boolean allowFastFail) {
        Object request = null;
        CompletionKey completionKey = null;
        if (this.useV2WireProtocol) {
            request = new BookieProtocol.ReadRequest(2, ledgerId, entryId, (short)flags, masterKey);
            completionKey = this.acquireV2Key(ledgerId, entryId, BookkeeperProtocol.OperationType.READ_ENTRY);
        } else {
            long txnId = this.getTxnId();
            completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.READ_ENTRY);
            BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.READ_ENTRY).setTxnId(txnId);
            if (((short)flags & 4) == 4) {
                headerBuilder.setPriority(100);
            }
            BookkeeperProtocol.ReadRequest.Builder readBuilder = BookkeeperProtocol.ReadRequest.newBuilder().setLedgerId(ledgerId).setEntryId(entryId);
            if (null != previousLAC) {
                readBuilder = readBuilder.setPreviousLAC(previousLAC);
            }
            if (null != timeOutInMillis) {
                if (null == previousLAC) {
                    cb.readEntryComplete(-14, ledgerId, entryId, null, ctx);
                    return;
                }
                readBuilder = readBuilder.setTimeOut(timeOutInMillis);
            }
            if (piggyBackEntry) {
                if (null == previousLAC) {
                    cb.readEntryComplete(-14, ledgerId, entryId, null, ctx);
                    return;
                }
                readBuilder = readBuilder.setFlag(BookkeeperProtocol.ReadRequest.Flag.ENTRY_PIGGYBACK);
            }
            if (((short)flags & 1) == 1) {
                readBuilder.setFlag(BookkeeperProtocol.ReadRequest.Flag.FENCE_LEDGER);
                if (masterKey == null) {
                    cb.readEntryComplete(-14, ledgerId, entryId, null, ctx);
                    return;
                }
                readBuilder.setMasterKey(ByteString.copyFrom(masterKey));
            }
            request = this.withRequestContext(BookkeeperProtocol.Request.newBuilder()).setHeader(headerBuilder).setReadRequest(readBuilder).build();
        }
        ReadCompletion readCompletion = new ReadCompletion(completionKey, cb, ctx, ledgerId, entryId);
        this.putCompletionKeyValue(completionKey, readCompletion);
        this.writeAndFlush(this.channel, completionKey, request, allowFastFail);
    }

    public void getBookieInfo(long requested, BookkeeperInternalCallbacks.GetBookieInfoCallback cb, Object ctx) {
        long txnId = this.getTxnId();
        V3CompletionKey completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.GET_BOOKIE_INFO);
        this.completionObjects.put(completionKey, new GetBookieInfoCompletion(completionKey, cb, ctx));
        BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.GET_BOOKIE_INFO).setTxnId(txnId);
        BookkeeperProtocol.GetBookieInfoRequest.Builder getBookieInfoBuilder = BookkeeperProtocol.GetBookieInfoRequest.newBuilder().setRequested(requested);
        BookkeeperProtocol.Request getBookieInfoRequest = this.withRequestContext(BookkeeperProtocol.Request.newBuilder()).setHeader(headerBuilder).setGetBookieInfoRequest(getBookieInfoBuilder).build();
        this.writeAndFlush(this.channel, completionKey, getBookieInfoRequest);
    }

    public void checkTimeoutOnPendingOperations() {
        int timedOutOperations = this.completionObjects.removeIf(timeoutCheck);
        if ((timedOutOperations += this.completionObjectsV2Conflicts.removeIf(timeoutCheck)) > 0) {
            LOG.info("Timed-out {} operations to channel {} for {}", new Object[]{timedOutOperations, this.channel, this.bookieId});
        }
    }

    public void disconnect() {
        this.disconnect(true);
    }

    public void disconnect(boolean wait) {
        LOG.info("Disconnecting the per channel bookie client for {}", (Object)this.bookieId);
        this.closeInternal(false, wait);
    }

    public void close() {
        this.close(true);
    }

    public void close(boolean wait) {
        LOG.info("Closing the per channel bookie client for {}", (Object)this.bookieId);
        this.closeLock.writeLock().lock();
        try {
            if (ConnectionState.CLOSED == this.state) {
                return;
            }
            this.state = ConnectionState.CLOSED;
            this.errorOutOutstandingEntries(-19);
        }
        finally {
            this.closeLock.writeLock().unlock();
        }
        if (this.channel != null && this.channel.pipeline().get(SslHandler.class) != null) {
            this.activeTlsChannelCounter.dec();
        } else {
            this.activeNonTlsChannelCounter.dec();
        }
        this.closeInternal(true, wait);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void closeInternal(boolean permanent, boolean wait) {
        Channel toClose = null;
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (permanent) {
                this.state = ConnectionState.CLOSED;
            } else if (this.state != ConnectionState.CLOSED) {
                this.state = ConnectionState.DISCONNECTED;
            }
            toClose = this.channel;
            this.channel = null;
            this.makeWritable();
        }
        if (toClose != null) {
            ChannelFuture cf = this.closeChannel(toClose);
            if (wait) {
                cf.awaitUninterruptibly();
            }
        }
    }

    private ChannelFuture closeChannel(Channel c) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Closing channel {}", (Object)c);
        }
        return c.close().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)x -> this.makeWritable()));
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        Channel c = this.channel;
        if (c == null || c.isWritable()) {
            this.makeWritable();
        }
        super.channelWritabilityChanged(ctx);
    }

    private void writeAndFlush(Channel channel, CompletionKey key, Object request) {
        this.writeAndFlush(channel, key, request, false);
    }

    private void writeAndFlush(Channel channel, CompletionKey key, Object request, boolean allowFastFail) {
        if (channel == null) {
            LOG.warn("Operation {} failed: channel == null", (Object)StringUtils.requestToString(request));
            this.errorOut(key);
            return;
        }
        boolean isChannelWritable = channel.isWritable();
        if (this.isWritable != isChannelWritable) {
            this.isWritable = isChannelWritable;
        }
        if (allowFastFail && !this.isWritable) {
            LOG.warn("Operation {} failed: TooManyRequestsException", (Object)StringUtils.requestToString(request));
            this.errorOut(key, -105);
            if (request instanceof BookieProtocol.AddRequest) {
                BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)request;
                ar.recycle();
            }
            return;
        }
        try {
            long startTime = MathUtils.nowInNano();
            ChannelPromise promise = channel.newPromise().addListener((GenericFutureListener<? extends Future<? super Void>>)((GenericFutureListener<Future>)future -> {
                if (future.isSuccess()) {
                    this.nettyOpLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                    CompletionValue completion = this.completionObjects.get(key);
                    if (completion != null) {
                        completion.setOutstanding();
                    }
                } else {
                    this.nettyOpLogger.registerFailedEvent(MathUtils.elapsedNanos(startTime), TimeUnit.NANOSECONDS);
                }
            }));
            channel.writeAndFlush(request, promise);
        }
        catch (Throwable e) {
            LOG.warn("Operation {} failed", (Object)StringUtils.requestToString(request), (Object)e);
            this.errorOut(key);
        }
    }

    void errorOut(CompletionKey key) {
        CompletionValue completion;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Removing completion key: {}", (Object)key);
        }
        if ((completion = this.completionObjects.remove(key)) != null) {
            completion.errorOut();
        } else {
            this.completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut());
        }
    }

    void errorOut(CompletionKey key, int rc) {
        CompletionValue completion;
        if (LOG.isDebugEnabled()) {
            LOG.debug("Removing completion key: {}", (Object)key);
        }
        if ((completion = this.completionObjects.remove(key)) != null) {
            completion.errorOut(rc);
        } else {
            this.completionObjectsV2Conflicts.removeAny(key).ifPresent(c -> c.errorOut(rc));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void errorOutPendingOps(int rc) {
        Queue<BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>> oldPendingOps;
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            oldPendingOps = this.pendingOps;
            this.pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>>();
        }
        for (BookkeeperInternalCallbacks.GenericCallback genericCallback : oldPendingOps) {
            genericCallback.operationComplete(rc, this);
        }
    }

    void errorOutOutstandingEntries(int rc) {
        Optional<CompletionKey> multikey = this.completionObjectsV2Conflicts.getAnyKey();
        while (multikey.isPresent()) {
            multikey.ifPresent(k -> this.errorOut((CompletionKey)k, rc));
            multikey = this.completionObjectsV2Conflicts.getAnyKey();
        }
        for (CompletionKey key : this.completionObjects.keys()) {
            this.errorOut(key, rc);
        }
    }

    void recordError() {
        if (this.pcbcPool != null) {
            this.pcbcPool.recordError();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        LOG.info("Disconnected from bookie channel {}", (Object)ctx.channel());
        if (ctx.channel() != null) {
            this.closeChannel(ctx.channel());
            if (ctx.channel().pipeline().get(SslHandler.class) != null) {
                this.activeTlsChannelCounter.dec();
            } else {
                this.activeNonTlsChannelCounter.dec();
            }
        }
        this.errorOutOutstandingEntries(-8);
        this.errorOutPendingOps(-8);
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (this.channel == ctx.channel() && this.state != ConnectionState.CLOSED) {
                this.state = ConnectionState.DISCONNECTED;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        this.exceptionCounter.inc();
        if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) {
            LOG.error("Corrupted frame received from bookie: {}", (Object)ctx.channel());
            ctx.close();
            return;
        }
        if (cause instanceof AuthHandler.AuthenticationException) {
            LOG.error("Error authenticating connection", cause);
            this.errorOutOutstandingEntries(-102);
            Channel c = ctx.channel();
            if (c != null) {
                this.closeChannel(c);
            }
            return;
        }
        if (cause instanceof DecoderException && cause.getCause() instanceof SSLException) {
            LOG.error("TLS handshake failed", cause);
            this.errorOutPendingOps(-24);
            Channel c = ctx.channel();
            if (c != null) {
                this.closeChannel(c);
            }
            return;
        }
        if (cause instanceof IOException) {
            if (cause instanceof Errors.NativeIoException) {
                LOG.warn("Exception caught on:{} cause: {}", (Object)ctx.channel(), (Object)cause.getMessage());
            } else {
                LOG.warn("Exception caught on:{} cause:", (Object)ctx.channel(), (Object)cause);
            }
            ctx.close();
            return;
        }
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            if (this.state == ConnectionState.CLOSED) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Unexpected exception caught by bookie client channel handler, but the client is closed, so it isn't important", cause);
                }
            } else {
                LOG.error("Unexpected exception caught by bookie client channel handler", cause);
            }
        }
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof BookieProtocol.Response) {
            BookieProtocol.Response response = (BookieProtocol.Response)msg;
            this.readV2Response(response);
        } else if (msg instanceof BookkeeperProtocol.Response) {
            BookkeeperProtocol.Response response = (BookkeeperProtocol.Response)msg;
            this.readV3Response(response);
        } else {
            ctx.fireChannelRead(msg);
        }
    }

    private void readV2Response(BookieProtocol.Response response) {
        BookkeeperProtocol.OperationType operationType = PerChannelBookieClient.getOperationType(response.getOpCode());
        BookkeeperProtocol.StatusCode status = PerChannelBookieClient.getStatusCodeFromErrorCode(response.errorCode);
        V2CompletionKey key = this.acquireV2Key(response.ledgerId, response.entryId, operationType);
        CompletionValue completionValue = this.getCompletionValue(key);
        ((CompletionKey)key).release();
        if (null == completionValue) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Unexpected response received from bookie : " + this.bookieId + " for type : " + operationType + " and ledger:entry : " + response.ledgerId + ":" + response.entryId);
            }
            response.release();
        } else {
            long orderingKey = completionValue.ledgerId;
            this.executor.executeOrdered(orderingKey, (SafeRunnable)ReadV2ResponseCallback.create(completionValue, response.ledgerId, response.entryId, status, response));
        }
    }

    private static BookkeeperProtocol.OperationType getOperationType(byte opCode) {
        switch (opCode) {
            case 1: {
                return BookkeeperProtocol.OperationType.ADD_ENTRY;
            }
            case 2: {
                return BookkeeperProtocol.OperationType.READ_ENTRY;
            }
            case 3: {
                return BookkeeperProtocol.OperationType.AUTH;
            }
            case 4: {
                return BookkeeperProtocol.OperationType.READ_LAC;
            }
            case 5: {
                return BookkeeperProtocol.OperationType.WRITE_LAC;
            }
            case 6: {
                return BookkeeperProtocol.OperationType.GET_BOOKIE_INFO;
            }
        }
        throw new IllegalArgumentException("Invalid operation type " + opCode);
    }

    private static BookkeeperProtocol.StatusCode getStatusCodeFromErrorCode(int errorCode) {
        switch (errorCode) {
            case 0: {
                return BookkeeperProtocol.StatusCode.EOK;
            }
            case 1: {
                return BookkeeperProtocol.StatusCode.ENOLEDGER;
            }
            case 2: {
                return BookkeeperProtocol.StatusCode.ENOENTRY;
            }
            case 100: {
                return BookkeeperProtocol.StatusCode.EBADREQ;
            }
            case 101: {
                return BookkeeperProtocol.StatusCode.EIO;
            }
            case 102: {
                return BookkeeperProtocol.StatusCode.EUA;
            }
            case 103: {
                return BookkeeperProtocol.StatusCode.EBADVERSION;
            }
            case 104: {
                return BookkeeperProtocol.StatusCode.EFENCED;
            }
            case 105: {
                return BookkeeperProtocol.StatusCode.EREADONLY;
            }
            case 106: {
                return BookkeeperProtocol.StatusCode.ETOOMANYREQUESTS;
            }
        }
        throw new IllegalArgumentException("Invalid error code: " + errorCode);
    }

    private void readV3Response(final BookkeeperProtocol.Response response) {
        final BookkeeperProtocol.BKPacketHeader header = response.getHeader();
        CompletionKey key = this.newCompletionKey(header.getTxnId(), header.getOperation());
        final CompletionValue completionValue = this.completionObjects.get(key);
        if (null == completionValue) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Unexpected response received from bookie : " + this.bookieId + " for type : " + header.getOperation() + " and txnId : " + header.getTxnId());
            }
        } else {
            long orderingKey = completionValue.ledgerId;
            this.executor.executeOrdered(orderingKey, (SafeRunnable)new org.apache.pulsar.shade.org.apache.bookkeeper.util.SafeRunnable(){

                @Override
                public void safeRun() {
                    completionValue.restoreMdcContext();
                    completionValue.handleV3Response(response);
                }

                public String toString() {
                    return String.format("HandleResponse(Txn=%d, Type=%s, Entry=(%d, %d))", header.getTxnId(), header.getOperation(), completionValue.ledgerId, completionValue.entryId);
                }
            });
        }
        this.completionObjects.remove(key);
    }

    void initTLSHandshake() {
        InetSocketAddress address;
        PerChannelBookieClient parentObj = this;
        SocketAddress socketAddress = this.channel.remoteAddress();
        if (socketAddress instanceof LocalAddress) {
            String[] addr = socketAddress.toString().split(":");
            String hostname = addr[1];
            int port = Integer.parseInt(addr[2]);
            address = new InetSocketAddress(hostname, port);
        } else if (socketAddress instanceof InetSocketAddress) {
            address = (InetSocketAddress)socketAddress;
        } else {
            throw new RuntimeException("Unexpected socket address type");
        }
        SslHandler handler = parentObj.shFactory.newTLSHandler(address.getHostName(), address.getPort());
        this.channel.pipeline().addFirst(parentObj.shFactory.getHandlerName(), (ChannelHandler)handler);
        handler.handshakeFuture().addListener(new GenericFutureListener<Future<Channel>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete(Future<Channel> future) throws Exception {
                Queue oldPendingOps;
                int rc;
                PerChannelBookieClient perChannelBookieClient = PerChannelBookieClient.this;
                synchronized (perChannelBookieClient) {
                    if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.CONNECTING) {
                        LOG.error("Connection state changed before TLS handshake completed {}/{}", (Object)PerChannelBookieClient.this.bookieId, (Object)PerChannelBookieClient.this.state);
                        rc = -8;
                        PerChannelBookieClient.this.closeChannel(PerChannelBookieClient.this.channel);
                        PerChannelBookieClient.this.channel = null;
                        if (PerChannelBookieClient.this.state != ConnectionState.CLOSED) {
                            PerChannelBookieClient.this.state = ConnectionState.DISCONNECTED;
                        }
                    } else if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.START_TLS) {
                        rc = 0;
                        LOG.info("Successfully connected to bookie using TLS: " + PerChannelBookieClient.this.bookieId);
                        PerChannelBookieClient.this.state = ConnectionState.CONNECTED;
                        AuthHandler.ClientSideHandler authHandler = ((Channel)future.get()).pipeline().get(AuthHandler.ClientSideHandler.class);
                        authHandler.authProvider.onProtocolUpgrade();
                        PerChannelBookieClient.this.activeTlsChannelCounter.inc();
                    } else if (future.isSuccess() && (PerChannelBookieClient.this.state == ConnectionState.CLOSED || PerChannelBookieClient.this.state == ConnectionState.DISCONNECTED)) {
                        LOG.warn("Closed before TLS handshake completed, clean up: {}, current state {}", (Object)PerChannelBookieClient.this.channel, (Object)PerChannelBookieClient.this.state);
                        PerChannelBookieClient.this.closeChannel(PerChannelBookieClient.this.channel);
                        rc = -8;
                        PerChannelBookieClient.this.channel = null;
                    } else {
                        if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.CONNECTED) {
                            LOG.debug("Already connected with another channel({}), so close the new channel({})", (Object)PerChannelBookieClient.this.channel, (Object)PerChannelBookieClient.this.channel);
                            PerChannelBookieClient.this.closeChannel(PerChannelBookieClient.this.channel);
                            return;
                        }
                        LOG.error("TLS handshake failed with bookie: {}/{}, current state {} : ", new Object[]{PerChannelBookieClient.this.channel, PerChannelBookieClient.this.bookieId, PerChannelBookieClient.this.state, future.cause()});
                        rc = -24;
                        PerChannelBookieClient.this.closeChannel(PerChannelBookieClient.this.channel);
                        PerChannelBookieClient.this.channel = null;
                        if (PerChannelBookieClient.this.state != ConnectionState.CLOSED) {
                            PerChannelBookieClient.this.state = ConnectionState.DISCONNECTED;
                        }
                        PerChannelBookieClient.this.failedTlsHandshakeCounter.inc();
                    }
                    oldPendingOps = PerChannelBookieClient.this.pendingOps;
                    PerChannelBookieClient.this.pendingOps = new ArrayDeque();
                }
                PerChannelBookieClient.this.makeWritable();
                for (BookkeeperInternalCallbacks.GenericCallback pendingOp : oldPendingOps) {
                    pendingOp.operationComplete(rc, PerChannelBookieClient.this);
                }
            }
        });
    }

    AddCompletion acquireAddCompletion(CompletionKey key, BookkeeperInternalCallbacks.WriteCallback originalCallback, Object originalCtx, long ledgerId, long entryId) {
        AddCompletion completion = this.addCompletionRecycler.get();
        completion.reset(key, originalCallback, originalCtx, ledgerId, entryId);
        return completion;
    }

    CompletionKey newCompletionKey(long txnId, BookkeeperProtocol.OperationType operationType) {
        return new V3CompletionKey(txnId, operationType);
    }

    private int statusCodeToExceptionCode(BookkeeperProtocol.StatusCode status) {
        switch (status) {
            case EOK: {
                return 0;
            }
            case ENOENTRY: {
                return -13;
            }
            case ENOLEDGER: {
                return -7;
            }
            case EBADVERSION: {
                return -16;
            }
            case EUA: {
                return -102;
            }
            case EFENCED: {
                return -101;
            }
            case EREADONLY: {
                return -104;
            }
            case ETOOMANYREQUESTS: {
                return -105;
            }
        }
        return 1;
    }

    private void putCompletionKeyValue(CompletionKey key, CompletionValue value) {
        CompletionValue existingValue = this.completionObjects.putIfAbsent(key, value);
        if (existingValue != null) {
            this.completionObjectsV2Conflicts.put(key, value);
        }
    }

    private CompletionValue getCompletionValue(CompletionKey key) {
        CompletionValue completionValue = this.completionObjects.remove(key);
        if (completionValue == null) {
            completionValue = this.completionObjectsV2Conflicts.removeAny(key).orElse(null);
        }
        return completionValue;
    }

    private long getTxnId() {
        return txnIdGenerator.incrementAndGet();
    }

    V2CompletionKey acquireV2Key(long ledgerId, long entryId, BookkeeperProtocol.OperationType operationType) {
        V2CompletionKey key = this.v2KeyRecycler.get();
        key.reset(ledgerId, entryId, operationType);
        return key;
    }

    BookkeeperProtocol.Request.Builder withRequestContext(BookkeeperProtocol.Request.Builder builder) {
        if (this.preserveMdcForTaskExecution) {
            return PerChannelBookieClient.appendRequestContext(builder);
        }
        return builder;
    }

    static BookkeeperProtocol.Request.Builder appendRequestContext(BookkeeperProtocol.Request.Builder builder) {
        Map mdcContextMap = MDC.getCopyOfContextMap();
        if (mdcContextMap == null || mdcContextMap.isEmpty()) {
            return builder;
        }
        for (Map.Entry kv : mdcContextMap.entrySet()) {
            BookkeeperProtocol.ContextPair context = BookkeeperProtocol.ContextPair.newBuilder().setKey((String)kv.getKey()).setValue((String)kv.getValue()).build();
            builder.addRequestContext(context);
        }
        return builder;
    }

    ChannelFutureListener contextPreservingListener(ChannelFutureListener listener) {
        return this.preserveMdcForTaskExecution ? new ContextPreservingFutureListener(listener) : listener;
    }

    private void initiateTLS() {
        LOG.info("Initializing TLS to {}", (Object)this.channel);
        assert (this.state == ConnectionState.CONNECTING);
        long txnId = this.getTxnId();
        V3CompletionKey completionKey = new V3CompletionKey(txnId, BookkeeperProtocol.OperationType.START_TLS);
        this.completionObjects.put(completionKey, new StartTLSCompletion(completionKey));
        BookkeeperProtocol.Request.Builder h = this.withRequestContext(BookkeeperProtocol.Request.newBuilder());
        BookkeeperProtocol.BKPacketHeader.Builder headerBuilder = BookkeeperProtocol.BKPacketHeader.newBuilder().setVersion(BookkeeperProtocol.ProtocolVersion.VERSION_THREE).setOperation(BookkeeperProtocol.OperationType.START_TLS).setTxnId(txnId);
        h.setHeader(headerBuilder.build());
        h.setStartTLSRequest(BookkeeperProtocol.StartTLSRequest.newBuilder().build());
        this.state = ConnectionState.START_TLS;
        this.writeAndFlush(this.channel, completionKey, h.build());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void failTLS(int rc) {
        Queue<BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>> oldPendingOps;
        LOG.error("TLS failure on: {}, rc: {}", (Object)this.channel, (Object)rc);
        PerChannelBookieClient perChannelBookieClient = this;
        synchronized (perChannelBookieClient) {
            this.disconnect();
            oldPendingOps = this.pendingOps;
            this.pendingOps = new ArrayDeque<BookkeeperInternalCallbacks.GenericCallback<PerChannelBookieClient>>();
        }
        for (BookkeeperInternalCallbacks.GenericCallback genericCallback : oldPendingOps) {
            genericCallback.operationComplete(rc, null);
        }
        this.failedTlsHandshakeCounter.inc();
    }

    private static class FailedChannelFutureImpl
    implements ChannelFuture {
        private final Throwable failureCause;

        public FailedChannelFutureImpl(Throwable failureCause) {
            this.failureCause = failureCause;
        }

        @Override
        public Channel channel() {
            return null;
        }

        @Override
        public ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener) {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public ChannelFuture addListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> listener) {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public ChannelFuture removeListeners(GenericFutureListener<? extends Future<? super Void>> ... listeners) {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public ChannelFuture sync() throws InterruptedException {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public ChannelFuture syncUninterruptibly() {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public ChannelFuture await() throws InterruptedException {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public ChannelFuture awaitUninterruptibly() {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public boolean isVoid() {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public boolean isSuccess() {
            return false;
        }

        @Override
        public boolean isCancellable() {
            return false;
        }

        @Override
        public Throwable cause() {
            return this.failureCause;
        }

        @Override
        public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
            return true;
        }

        @Override
        public boolean await(long timeoutMillis) throws InterruptedException {
            return true;
        }

        @Override
        public boolean awaitUninterruptibly(long timeout, TimeUnit unit) {
            return true;
        }

        @Override
        public boolean awaitUninterruptibly(long timeoutMillis) {
            return true;
        }

        @Override
        public Void getNow() {
            throw new UnsupportedOperationException("Not supported");
        }

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            return false;
        }

        @Override
        public boolean isCancelled() {
            return false;
        }

        @Override
        public boolean isDone() {
            return true;
        }

        @Override
        public Void get() throws InterruptedException, ExecutionException {
            throw new ExecutionException(this.failureCause);
        }

        @Override
        public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            throw new ExecutionException(this.failureCause);
        }
    }

    class ConnectionFutureListener
    implements ChannelFutureListener {
        private final long startTime;

        ConnectionFutureListener(long startTime) {
            this.startTime = startTime;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void operationComplete(ChannelFuture future) {
            Queue oldPendingOps;
            int rc;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Channel connected ({}) {}", (Object)future.isSuccess(), (Object)future.channel());
            }
            if (future.isSuccess()) {
                PerChannelBookieClient.this.connectTimer.registerSuccessfulEvent(MathUtils.elapsedNanos(this.startTime), TimeUnit.NANOSECONDS);
            } else {
                PerChannelBookieClient.this.connectTimer.registerFailedEvent(MathUtils.elapsedNanos(this.startTime), TimeUnit.NANOSECONDS);
            }
            PerChannelBookieClient perChannelBookieClient = PerChannelBookieClient.this;
            synchronized (perChannelBookieClient) {
                if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.CONNECTING && future.channel().isActive()) {
                    rc = 0;
                    PerChannelBookieClient.this.channel = future.channel();
                    if (PerChannelBookieClient.this.shFactory != null) {
                        LOG.info("Successfully connected to bookie: {} {} initiate TLS", (Object)PerChannelBookieClient.this.bookieId, (Object)future.channel());
                        PerChannelBookieClient.this.makeWritable();
                        PerChannelBookieClient.this.initiateTLS();
                        return;
                    }
                    LOG.info("Successfully connected to bookie: {} {}", (Object)PerChannelBookieClient.this.bookieId, (Object)future.channel());
                    PerChannelBookieClient.this.state = ConnectionState.CONNECTED;
                    PerChannelBookieClient.this.activeNonTlsChannelCounter.inc();
                } else if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.START_TLS) {
                    rc = 0;
                    LOG.info("Successfully connected to bookie using TLS: " + PerChannelBookieClient.this.bookieId);
                    PerChannelBookieClient.this.state = ConnectionState.CONNECTED;
                    AuthHandler.ClientSideHandler authHandler = future.channel().pipeline().get(AuthHandler.ClientSideHandler.class);
                    authHandler.authProvider.onProtocolUpgrade();
                    PerChannelBookieClient.this.activeTlsChannelCounter.inc();
                } else if (future.isSuccess() && (PerChannelBookieClient.this.state == ConnectionState.CLOSED || PerChannelBookieClient.this.state == ConnectionState.DISCONNECTED)) {
                    LOG.warn("Closed before connection completed, clean up: {}, current state {}", (Object)future.channel(), (Object)PerChannelBookieClient.this.state);
                    PerChannelBookieClient.this.closeChannel(future.channel());
                    rc = -8;
                    PerChannelBookieClient.this.channel = null;
                } else {
                    if (future.isSuccess() && PerChannelBookieClient.this.state == ConnectionState.CONNECTED) {
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Already connected with another channel({}), so close the new channel({})", (Object)PerChannelBookieClient.this.channel, (Object)future.channel());
                        }
                        PerChannelBookieClient.this.closeChannel(future.channel());
                        return;
                    }
                    Throwable cause = future.cause();
                    if (cause instanceof UnknownHostException || cause instanceof Errors.NativeIoException) {
                        this.logBookieUnavailable(() -> LOG.warn("Could not connect to bookie: {}/{}, current state {} : {}", new Object[]{future.channel(), PerChannelBookieClient.this.bookieId, PerChannelBookieClient.this.state, future.cause().getMessage()}));
                    } else {
                        this.logBookieUnavailable(() -> LOG.error("Could not connect to bookie: {}/{}, current state {} : ", new Object[]{future.channel(), PerChannelBookieClient.this.bookieId, PerChannelBookieClient.this.state, future.cause()}));
                    }
                    rc = -8;
                    Channel failedChannel = future.channel();
                    if (failedChannel != null) {
                        PerChannelBookieClient.this.closeChannel(failedChannel);
                    }
                    PerChannelBookieClient.this.channel = null;
                    if (PerChannelBookieClient.this.state != ConnectionState.CLOSED) {
                        PerChannelBookieClient.this.state = ConnectionState.DISCONNECTED;
                    }
                    PerChannelBookieClient.this.failedConnectionCounter.inc();
                }
                oldPendingOps = PerChannelBookieClient.this.pendingOps;
                PerChannelBookieClient.this.pendingOps = new ArrayDeque();
            }
            for (BookkeeperInternalCallbacks.GenericCallback pendingOp : oldPendingOps) {
                pendingOp.operationComplete(rc, PerChannelBookieClient.this);
            }
            PerChannelBookieClient.this.makeWritable();
        }

        private void logBookieUnavailable(Runnable logger) {
            long now = System.currentTimeMillis();
            if (now - PerChannelBookieClient.this.lastBookieUnavailableLogTimestamp > PerChannelBookieClient.this.conf.getClientConnectBookieUnavailableLogThrottlingMs()) {
                logger.run();
                PerChannelBookieClient.this.lastBookieUnavailableLogTimestamp = now;
            }
        }
    }

    static class ContextPreservingFutureListener
    implements ChannelFutureListener {
        private final ChannelFutureListener listener;
        private final Map<String, String> mdcContextMap;

        ContextPreservingFutureListener(ChannelFutureListener listener) {
            this.listener = listener;
            this.mdcContextMap = MDC.getCopyOfContextMap();
        }

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            MdcUtils.restoreContext(this.mdcContextMap);
            try {
                this.listener.operationComplete(future);
            }
            finally {
                MDC.clear();
            }
        }
    }

    private class V2CompletionKey
    extends CompletionKey {
        private final Recycler.Handle<V2CompletionKey> recyclerHandle;
        long ledgerId;
        long entryId;

        private V2CompletionKey(Recycler.Handle<V2CompletionKey> handle) {
            super(-1L, null);
            this.recyclerHandle = handle;
        }

        void reset(long ledgerId, long entryId, BookkeeperProtocol.OperationType operationType) {
            this.ledgerId = ledgerId;
            this.entryId = entryId;
            this.operationType = operationType;
        }

        public boolean equals(Object object) {
            if (!(object instanceof V2CompletionKey)) {
                return false;
            }
            V2CompletionKey that = (V2CompletionKey)object;
            return this.entryId == that.entryId && this.ledgerId == that.ledgerId && this.operationType == that.operationType;
        }

        public int hashCode() {
            return Long.hashCode(this.ledgerId) * 31 + Long.hashCode(this.entryId);
        }

        public String toString() {
            return String.format("%d:%d %s", this.ledgerId, this.entryId, this.operationType);
        }

        @Override
        public void release() {
            this.recyclerHandle.recycle(this);
        }
    }

    abstract class CompletionKey {
        final long txnId;
        BookkeeperProtocol.OperationType operationType;

        CompletionKey(long txnId, BookkeeperProtocol.OperationType operationType) {
            this.txnId = txnId;
            this.operationType = operationType;
        }

        public void release() {
        }
    }

    class V3CompletionKey
    extends CompletionKey {
        public V3CompletionKey(long txnId, BookkeeperProtocol.OperationType operationType) {
            super(txnId, operationType);
        }

        public boolean equals(Object obj) {
            if (!(obj instanceof V3CompletionKey)) {
                return false;
            }
            V3CompletionKey that = (V3CompletionKey)obj;
            return this.txnId == that.txnId && this.operationType == that.operationType;
        }

        public int hashCode() {
            return (int)this.txnId;
        }

        public String toString() {
            return String.format("TxnId(%d), OperationType(%s)", this.txnId, this.operationType);
        }
    }

    class AddCompletion
    extends CompletionValue
    implements BookkeeperInternalCallbacks.WriteCallback {
        final Recycler.Handle<AddCompletion> handle;
        CompletionKey key;
        BookkeeperInternalCallbacks.WriteCallback originalCallback;

        AddCompletion(Recycler.Handle<AddCompletion> handle) {
            super("Add", null, -1L, -1L, PerChannelBookieClient.this.addEntryOpLogger, PerChannelBookieClient.this.addTimeoutOpLogger);
            this.key = null;
            this.originalCallback = null;
            this.handle = handle;
        }

        void reset(CompletionKey key, BookkeeperInternalCallbacks.WriteCallback originalCallback, Object originalCtx, long ledgerId, long entryId) {
            this.key = key;
            this.originalCallback = originalCallback;
            this.ctx = originalCtx;
            this.ledgerId = ledgerId;
            this.entryId = entryId;
            this.startTime = MathUtils.nowInNano();
        }

        @Override
        public void writeComplete(int rc, long ledgerId, long entryId, BookieId addr, Object ctx) {
            this.logOpResult(rc);
            this.originalCallback.writeComplete(rc, ledgerId, entryId, addr, ctx);
            this.key.release();
            this.handle.recycle(this);
        }

        @Override
        boolean maybeTimeout() {
            if (MathUtils.elapsedNanos(this.startTime) >= PerChannelBookieClient.this.addEntryTimeoutNanos) {
                this.timeout();
                return true;
            }
            return false;
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            this.errorOutAndRunCallback(() -> this.writeComplete(rc, this.ledgerId, this.entryId, PerChannelBookieClient.this.bookieId, this.ctx));
        }

        @Override
        public void setOutstanding() {
            PerChannelBookieClient.this.addEntryOutstanding.inc();
        }

        @Override
        public void handleV2Response(long ledgerId, long entryId, BookkeeperProtocol.StatusCode status, BookieProtocol.Response response) {
            PerChannelBookieClient.this.addEntryOutstanding.dec();
            this.handleResponse(ledgerId, entryId, status);
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            PerChannelBookieClient.this.addEntryOutstanding.dec();
            BookkeeperProtocol.AddResponse addResponse = response.getAddResponse();
            BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? addResponse.getStatus() : response.getStatus();
            this.handleResponse(addResponse.getLedgerId(), addResponse.getEntryId(), status);
        }

        private void handleResponse(long ledgerId, long entryId, BookkeeperProtocol.StatusCode status) {
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, "ledger", ledgerId, "entry", entryId);
            }
            int rc = this.convertStatus(status, -12);
            this.writeComplete(rc, ledgerId, entryId, PerChannelBookieClient.this.bookieId, this.ctx);
        }
    }

    class GetListOfEntriesOfLedgerCompletion
    extends CompletionValue {
        final BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback cb;

        public GetListOfEntriesOfLedgerCompletion(final CompletionKey key, final BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback origCallback, long ledgerId) {
            super("GetListOfEntriesOfLedger", null, ledgerId, 0L, PerChannelBookieClient.this.getListOfEntriesOfLedgerCompletionOpLogger, PerChannelBookieClient.this.getListOfEntriesOfLedgerCompletionTimeoutOpLogger);
            this.cb = new BookkeeperInternalCallbacks.GetListOfEntriesOfLedgerCallback(){

                @Override
                public void getListOfEntriesOfLedgerComplete(int rc, long ledgerId, AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger) {
                    GetListOfEntriesOfLedgerCompletion.this.logOpResult(rc);
                    origCallback.getListOfEntriesOfLedgerComplete(rc, ledgerId, availabilityOfEntriesOfLedger);
                    key.release();
                }
            };
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            this.errorOutAndRunCallback(() -> this.cb.getListOfEntriesOfLedgerComplete(rc, this.ledgerId, null));
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            BookkeeperProtocol.StatusCode status;
            BookkeeperProtocol.GetListOfEntriesOfLedgerResponse getListOfEntriesOfLedgerResponse = response.getGetListOfEntriesOfLedgerResponse();
            ByteBuf availabilityOfEntriesOfLedgerBuffer = Unpooled.EMPTY_BUFFER;
            BookkeeperProtocol.StatusCode statusCode = status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? getListOfEntriesOfLedgerResponse.getStatus() : response.getStatus();
            if (getListOfEntriesOfLedgerResponse.hasAvailabilityOfEntriesOfLedger()) {
                availabilityOfEntriesOfLedgerBuffer = Unpooled.wrappedBuffer(getListOfEntriesOfLedgerResponse.getAvailabilityOfEntriesOfLedger().asReadOnlyByteBuffer());
            }
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, "ledgerId", this.ledgerId);
            }
            int rc = this.convertStatus(status, -1);
            AvailabilityOfEntriesOfLedger availabilityOfEntriesOfLedger = null;
            if (rc == 0) {
                availabilityOfEntriesOfLedger = new AvailabilityOfEntriesOfLedger(availabilityOfEntriesOfLedgerBuffer.slice());
            }
            this.cb.getListOfEntriesOfLedgerComplete(rc, this.ledgerId, availabilityOfEntriesOfLedger);
        }
    }

    class GetBookieInfoCompletion
    extends CompletionValue {
        final BookkeeperInternalCallbacks.GetBookieInfoCallback cb;

        public GetBookieInfoCompletion(final CompletionKey key, final BookkeeperInternalCallbacks.GetBookieInfoCallback origCallback, final Object origCtx) {
            super("GetBookieInfo", origCtx, 0L, 0L, PerChannelBookieClient.this.getBookieInfoOpLogger, PerChannelBookieClient.this.getBookieInfoTimeoutOpLogger);
            this.cb = new BookkeeperInternalCallbacks.GetBookieInfoCallback(){

                @Override
                public void getBookieInfoComplete(int rc, BookieInfoReader.BookieInfo bInfo, Object ctx) {
                    GetBookieInfoCompletion.this.logOpResult(rc);
                    origCallback.getBookieInfoComplete(rc, bInfo, origCtx);
                    key.release();
                }
            };
        }

        @Override
        boolean maybeTimeout() {
            if (MathUtils.elapsedNanos(this.startTime) >= PerChannelBookieClient.this.getBookieInfoTimeoutNanos) {
                this.timeout();
                return true;
            }
            return false;
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            this.errorOutAndRunCallback(() -> this.cb.getBookieInfoComplete(rc, new BookieInfoReader.BookieInfo(), this.ctx));
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            BookkeeperProtocol.GetBookieInfoResponse getBookieInfoResponse = response.getGetBookieInfoResponse();
            BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? getBookieInfoResponse.getStatus() : response.getStatus();
            long freeDiskSpace = getBookieInfoResponse.getFreeDiskSpace();
            long totalDiskSpace = getBookieInfoResponse.getTotalDiskCapacity();
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, "freeDisk", freeDiskSpace, "totalDisk", totalDiskSpace);
            }
            int rc = this.convertStatus(status, -1);
            this.cb.getBookieInfoComplete(rc, new BookieInfoReader.BookieInfo(totalDiskSpace, freeDiskSpace), this.ctx);
        }
    }

    class StartTLSCompletion
    extends CompletionValue {
        final BookkeeperInternalCallbacks.StartTLSCallback cb;

        public StartTLSCompletion(final CompletionKey key) {
            super("StartTLS", null, -1L, -1L, PerChannelBookieClient.this.startTLSOpLogger, PerChannelBookieClient.this.startTLSTimeoutOpLogger);
            this.cb = new BookkeeperInternalCallbacks.StartTLSCallback(){

                @Override
                public void startTLSComplete(int rc, Object ctx) {
                    StartTLSCompletion.this.logOpResult(rc);
                    key.release();
                }
            };
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            PerChannelBookieClient.this.failTLS(rc);
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            BookkeeperProtocol.StatusCode status = response.getStatus();
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, new Object[0]);
            }
            int rc = this.convertStatus(status, -24);
            this.cb.startTLSComplete(rc, null);
            if (PerChannelBookieClient.this.state != ConnectionState.START_TLS) {
                LOG.error("Connection state changed before TLS response received");
                PerChannelBookieClient.this.failTLS(-8);
            } else if (status != BookkeeperProtocol.StatusCode.EOK) {
                LOG.error("Client received error {} during TLS negotiation", (Object)status);
                PerChannelBookieClient.this.failTLS(-24);
            } else {
                PerChannelBookieClient.this.initTLSHandshake();
            }
        }
    }

    class ReadCompletion
    extends CompletionValue {
        final BookkeeperInternalCallbacks.ReadEntryCallback cb;

        public ReadCompletion(final CompletionKey key, final BookkeeperInternalCallbacks.ReadEntryCallback originalCallback, final Object originalCtx, long ledgerId, long entryId) {
            super("Read", originalCtx, ledgerId, entryId, PerChannelBookieClient.this.readEntryOpLogger, PerChannelBookieClient.this.readTimeoutOpLogger);
            this.cb = new BookkeeperInternalCallbacks.ReadEntryCallback(){

                @Override
                public void readEntryComplete(int rc, long ledgerId, long entryId, ByteBuf buffer, Object ctx) {
                    ReadCompletion.this.logOpResult(rc);
                    originalCallback.readEntryComplete(rc, ledgerId, entryId, buffer, originalCtx);
                    key.release();
                }
            };
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            this.errorOutAndRunCallback(() -> this.cb.readEntryComplete(rc, this.ledgerId, this.entryId, null, this.ctx));
        }

        @Override
        public void setOutstanding() {
            PerChannelBookieClient.this.readEntryOutstanding.inc();
        }

        @Override
        public void handleV2Response(long ledgerId, long entryId, BookkeeperProtocol.StatusCode status, BookieProtocol.Response response) {
            PerChannelBookieClient.this.readEntryOutstanding.dec();
            if (!(response instanceof BookieProtocol.ReadResponse)) {
                return;
            }
            BookieProtocol.ReadResponse readResponse = (BookieProtocol.ReadResponse)response;
            this.handleReadResponse(ledgerId, entryId, status, readResponse.getData(), -1L, -1L);
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            PerChannelBookieClient.this.readEntryOutstanding.dec();
            BookkeeperProtocol.ReadResponse readResponse = response.getReadResponse();
            BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? readResponse.getStatus() : response.getStatus();
            ByteBuf buffer = Unpooled.EMPTY_BUFFER;
            if (readResponse.hasBody()) {
                buffer = Unpooled.wrappedBuffer(readResponse.getBody().asReadOnlyByteBuffer());
            }
            long maxLAC = -1L;
            if (readResponse.hasMaxLAC()) {
                maxLAC = readResponse.getMaxLAC();
            }
            long lacUpdateTimestamp = -1L;
            if (readResponse.hasLacUpdateTimestamp()) {
                lacUpdateTimestamp = readResponse.getLacUpdateTimestamp();
            }
            this.handleReadResponse(readResponse.getLedgerId(), readResponse.getEntryId(), status, buffer, maxLAC, lacUpdateTimestamp);
            buffer.release();
        }

        private void handleReadResponse(long ledgerId, long entryId, BookkeeperProtocol.StatusCode status, ByteBuf buffer, long maxLAC, long lacUpdateTimestamp) {
            int readableBytes = buffer.readableBytes();
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, "ledger", ledgerId, "entry", entryId, "entryLength", readableBytes);
            }
            int rc = this.convertStatus(status, -1);
            if (maxLAC > -1L && this.ctx instanceof BookkeeperInternalCallbacks.ReadEntryCallbackCtx) {
                ((BookkeeperInternalCallbacks.ReadEntryCallbackCtx)this.ctx).setLastAddConfirmed(maxLAC);
            }
            if (lacUpdateTimestamp > -1L && this.ctx instanceof ReadLastConfirmedAndEntryContext) {
                ((ReadLastConfirmedAndEntryContext)this.ctx).setLacUpdateTimestamp(lacUpdateTimestamp);
            }
            this.cb.readEntryComplete(rc, ledgerId, entryId, buffer.slice(), this.ctx);
        }
    }

    class ReadLacCompletion
    extends CompletionValue {
        final BookkeeperInternalCallbacks.ReadLacCallback cb;

        public ReadLacCompletion(final CompletionKey key, final BookkeeperInternalCallbacks.ReadLacCallback originalCallback, Object ctx, long ledgerId) {
            super("ReadLAC", ctx, ledgerId, -1L, PerChannelBookieClient.this.readLacOpLogger, PerChannelBookieClient.this.readLacTimeoutOpLogger);
            this.cb = new BookkeeperInternalCallbacks.ReadLacCallback(){

                @Override
                public void readLacComplete(int rc, long ledgerId, ByteBuf lacBuffer, ByteBuf lastEntryBuffer, Object ctx) {
                    ReadLacCompletion.this.logOpResult(rc);
                    originalCallback.readLacComplete(rc, ledgerId, lacBuffer, lastEntryBuffer, ctx);
                    key.release();
                }
            };
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            this.errorOutAndRunCallback(() -> this.cb.readLacComplete(rc, this.ledgerId, null, null, this.ctx));
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            BookkeeperProtocol.StatusCode status;
            BookkeeperProtocol.ReadLacResponse readLacResponse = response.getReadLacResponse();
            ByteBuf lacBuffer = Unpooled.EMPTY_BUFFER;
            ByteBuf lastEntryBuffer = Unpooled.EMPTY_BUFFER;
            BookkeeperProtocol.StatusCode statusCode = status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? readLacResponse.getStatus() : response.getStatus();
            if (readLacResponse.hasLacBody()) {
                lacBuffer = Unpooled.wrappedBuffer(readLacResponse.getLacBody().asReadOnlyByteBuffer());
            }
            if (readLacResponse.hasLastEntryBody()) {
                lastEntryBuffer = Unpooled.wrappedBuffer(readLacResponse.getLastEntryBody().asReadOnlyByteBuffer());
            }
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, "ledgerId", this.ledgerId);
            }
            int rc = this.convertStatus(status, -1);
            this.cb.readLacComplete(rc, this.ledgerId, lacBuffer.slice(), lastEntryBuffer.slice(), this.ctx);
        }
    }

    class ForceLedgerCompletion
    extends CompletionValue {
        final BookkeeperInternalCallbacks.ForceLedgerCallback cb;

        public ForceLedgerCompletion(final CompletionKey key, final BookkeeperInternalCallbacks.ForceLedgerCallback originalCallback, final Object originalCtx, long ledgerId) {
            super("ForceLedger", originalCtx, ledgerId, -1L, PerChannelBookieClient.this.forceLedgerOpLogger, PerChannelBookieClient.this.forceLedgerTimeoutOpLogger);
            this.cb = new BookkeeperInternalCallbacks.ForceLedgerCallback(){

                @Override
                public void forceLedgerComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
                    ForceLedgerCompletion.this.logOpResult(rc);
                    originalCallback.forceLedgerComplete(rc, ledgerId, addr, originalCtx);
                    key.release();
                }
            };
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            this.errorOutAndRunCallback(() -> this.cb.forceLedgerComplete(rc, this.ledgerId, PerChannelBookieClient.this.bookieId, this.ctx));
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            BookkeeperProtocol.ForceLedgerResponse forceLedgerResponse = response.getForceLedgerResponse();
            BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? forceLedgerResponse.getStatus() : response.getStatus();
            long ledgerId = forceLedgerResponse.getLedgerId();
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, "ledger", ledgerId);
            }
            int rc = this.convertStatus(status, -12);
            this.cb.forceLedgerComplete(rc, ledgerId, PerChannelBookieClient.this.bookieId, this.ctx);
        }
    }

    class WriteLacCompletion
    extends CompletionValue {
        final BookkeeperInternalCallbacks.WriteLacCallback cb;

        public WriteLacCompletion(final CompletionKey key, final BookkeeperInternalCallbacks.WriteLacCallback originalCallback, final Object originalCtx, long ledgerId) {
            super("WriteLAC", originalCtx, ledgerId, -1L, PerChannelBookieClient.this.writeLacOpLogger, PerChannelBookieClient.this.writeLacTimeoutOpLogger);
            this.cb = new BookkeeperInternalCallbacks.WriteLacCallback(){

                @Override
                public void writeLacComplete(int rc, long ledgerId, BookieId addr, Object ctx) {
                    WriteLacCompletion.this.logOpResult(rc);
                    originalCallback.writeLacComplete(rc, ledgerId, addr, originalCtx);
                    key.release();
                }
            };
        }

        @Override
        public void errorOut() {
            this.errorOut(-8);
        }

        @Override
        public void errorOut(int rc) {
            this.errorOutAndRunCallback(() -> this.cb.writeLacComplete(rc, this.ledgerId, PerChannelBookieClient.this.bookieId, this.ctx));
        }

        @Override
        public void handleV3Response(BookkeeperProtocol.Response response) {
            BookkeeperProtocol.WriteLacResponse writeLacResponse = response.getWriteLacResponse();
            BookkeeperProtocol.StatusCode status = response.getStatus() == BookkeeperProtocol.StatusCode.EOK ? writeLacResponse.getStatus() : response.getStatus();
            long ledgerId = writeLacResponse.getLedgerId();
            if (LOG.isDebugEnabled()) {
                this.logResponse(status, "ledger", ledgerId);
            }
            int rc = this.convertStatus(status, -12);
            this.cb.writeLacComplete(rc, ledgerId, PerChannelBookieClient.this.bookieId, this.ctx);
        }
    }

    abstract class CompletionValue {
        private final OpStatsLogger opLogger;
        private final OpStatsLogger timeoutOpLogger;
        private final String operationName;
        private final Map<String, String> mdcContextMap;
        protected Object ctx;
        protected long ledgerId;
        protected long entryId;
        protected long startTime;

        public CompletionValue(String operationName, Object ctx, long ledgerId, long entryId, OpStatsLogger opLogger, OpStatsLogger timeoutOpLogger) {
            this.operationName = operationName;
            this.ctx = ctx;
            this.ledgerId = ledgerId;
            this.entryId = entryId;
            this.startTime = MathUtils.nowInNano();
            this.opLogger = opLogger;
            this.timeoutOpLogger = timeoutOpLogger;
            this.mdcContextMap = PerChannelBookieClient.this.preserveMdcForTaskExecution ? MDC.getCopyOfContextMap() : null;
        }

        private long latency() {
            return MathUtils.elapsedNanos(this.startTime);
        }

        void logOpResult(int rc) {
            if (rc != 0) {
                this.opLogger.registerFailedEvent(this.latency(), TimeUnit.NANOSECONDS);
            } else {
                this.opLogger.registerSuccessfulEvent(this.latency(), TimeUnit.NANOSECONDS);
            }
            if (rc != 0 && !expectedBkOperationErrors.contains(rc)) {
                PerChannelBookieClient.this.recordError();
            }
        }

        boolean maybeTimeout() {
            if (MathUtils.elapsedNanos(this.startTime) >= PerChannelBookieClient.this.readEntryTimeoutNanos) {
                this.timeout();
                return true;
            }
            return false;
        }

        void timeout() {
            this.errorOut(-23);
            this.timeoutOpLogger.registerSuccessfulEvent(this.latency(), TimeUnit.NANOSECONDS);
        }

        protected void logResponse(BookkeeperProtocol.StatusCode status, Object ... extraInfo) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Got {} response from bookie:{} rc:{}, {}", new Object[]{this.operationName, PerChannelBookieClient.this.bookieId, status, Joiner.on(":").join(extraInfo)});
            }
        }

        protected int convertStatus(BookkeeperProtocol.StatusCode status, int defaultStatus) {
            int rcToRet = PerChannelBookieClient.this.statusCodeToExceptionCode(status);
            if (rcToRet == 1) {
                LOG.error("{} for failed on bookie {} code {}", new Object[]{this.operationName, PerChannelBookieClient.this.bookieId, status});
                return defaultStatus;
            }
            return rcToRet;
        }

        public void restoreMdcContext() {
            MdcUtils.restoreContext(this.mdcContextMap);
        }

        public abstract void errorOut();

        public abstract void errorOut(int var1);

        public void setOutstanding() {
        }

        protected void errorOutAndRunCallback(final Runnable callback) {
            PerChannelBookieClient.this.executor.executeOrdered(this.ledgerId, (SafeRunnable)new org.apache.pulsar.shade.org.apache.bookkeeper.util.SafeRunnable(){

                @Override
                public void safeRun() {
                    String bAddress = "null";
                    Channel c = PerChannelBookieClient.this.channel;
                    if (c != null && c.remoteAddress() != null) {
                        bAddress = c.remoteAddress().toString();
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Could not write {} request to bookie {} for ledger {}, entry {}", new Object[]{CompletionValue.this.operationName, bAddress, CompletionValue.this.ledgerId, CompletionValue.this.entryId});
                    }
                    callback.run();
                }
            });
        }

        public void handleV2Response(long ledgerId, long entryId, BookkeeperProtocol.StatusCode status, BookieProtocol.Response response) {
            LOG.warn("Unhandled V2 response {}", (Object)response);
        }

        public abstract void handleV3Response(BookkeeperProtocol.Response var1);
    }

    private static class ReadV2ResponseCallback
    extends org.apache.pulsar.shade.org.apache.bookkeeper.util.SafeRunnable {
        CompletionValue completionValue;
        long ledgerId;
        long entryId;
        BookkeeperProtocol.StatusCode status;
        BookieProtocol.Response response;
        private final Recycler.Handle<ReadV2ResponseCallback> recyclerHandle;
        private static final Recycler<ReadV2ResponseCallback> RECYCLER = new Recycler<ReadV2ResponseCallback>(){

            @Override
            protected ReadV2ResponseCallback newObject(Recycler.Handle<ReadV2ResponseCallback> handle) {
                return new ReadV2ResponseCallback(handle);
            }
        };

        static ReadV2ResponseCallback create(CompletionValue completionValue, long ledgerId, long entryId, BookkeeperProtocol.StatusCode status, BookieProtocol.Response response) {
            ReadV2ResponseCallback callback = RECYCLER.get();
            callback.completionValue = completionValue;
            callback.ledgerId = ledgerId;
            callback.entryId = entryId;
            callback.status = status;
            callback.response = response;
            return callback;
        }

        @Override
        public void safeRun() {
            this.completionValue.handleV2Response(this.ledgerId, this.entryId, this.status, this.response);
            this.response.release();
            this.response.recycle();
            this.recycle();
        }

        void recycle() {
            this.completionValue = null;
            this.ledgerId = -1L;
            this.entryId = -1L;
            this.status = null;
            this.response = null;
            this.recyclerHandle.recycle(this);
        }

        private ReadV2ResponseCallback(Recycler.Handle<ReadV2ResponseCallback> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }

    static enum ConnectionState {
        DISCONNECTED,
        CONNECTING,
        CONNECTED,
        CLOSED,
        START_TLS;

    }
}

