/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.TransactionMetadataStoreService;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.authentication.AuthenticationProvider;
import org.apache.pulsar.broker.authentication.AuthenticationState;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.ConnectionController;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.PulsarCommandSender;
import org.apache.pulsar.broker.service.PulsarCommandSenderImpl;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.schema.SchemaInfoUtil;
import org.apache.pulsar.functions.utils.Exceptions;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandler;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.shade.io.netty.channel.ChannelOption;
import org.apache.pulsar.shade.io.netty.handler.codec.haproxy.HAProxyMessage;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslHandler;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.io.netty.util.concurrent.Promise;
import org.apache.pulsar.shade.io.prometheus.client.Gauge;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Entry;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableInt;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableLong;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAuthResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandCloseConsumer;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandCloseProducer;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandConsumerStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandEndTxn;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandEndTxnOnPartition;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscription;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandFlow;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetLastMessageId;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetOrCreateSchema;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandNewTxn;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandTcClientConnectRequest;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandUnsubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProducerAccessMode;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.shade.org.apache.pulsar.common.intercept.InterceptException;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.NamespaceOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.TopicOperation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.CommandUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.PulsarHandler;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServerCnx
extends PulsarHandler
implements TransportCnx {
    private final BrokerService service;
    private final SchemaRegistryService schemaService;
    private final String listenerName;
    private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
    private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
    private State state;
    private volatile boolean isActive = true;
    String authRole = null;
    private volatile AuthenticationDataSource authenticationData;
    AuthenticationProvider authenticationProvider;
    AuthenticationState authState;
    AuthenticationState originalAuthState;
    AuthenticationDataSource originalAuthData;
    private boolean pendingAuthChallengeResponse = false;
    private final int maxPendingSendRequests;
    private final int resumeReadsThreshold;
    private int pendingSendRequest = 0;
    private final String replicatorPrefix;
    private String clientVersion = null;
    private int nonPersistentPendingMessages = 0;
    private final int maxNonPersistentPendingMessages;
    private String originalPrincipal = null;
    private Set<String> proxyRoles;
    private boolean authenticateOriginalAuthData;
    private final boolean schemaValidationEnforced;
    private String authMethod = "none";
    private final int maxMessageSize;
    private boolean preciseDispatcherFlowControl;
    private boolean preciseTopicPublishRateLimitingEnable;
    private boolean encryptionRequireOnProducer;
    private volatile boolean autoReadDisabledRateLimiting = false;
    private FeatureFlags features;
    private PulsarCommandSender commandSender;
    private final ConnectionController connectionController;
    private static final KeySharedMeta emptyKeySharedMeta = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT);
    private boolean autoReadDisabledPublishBufferLimiting = false;
    private final long maxPendingBytesPerThread;
    private final long resumeThresholdPendingBytesPerThread;
    private static final FastThreadLocal<MutableLong> pendingBytesPerThread = new FastThreadLocal<MutableLong>(){

        @Override
        protected MutableLong initialValue() throws Exception {
            return new MutableLong();
        }
    };
    private static final FastThreadLocal<Set<ServerCnx>> cnxsPerThread = new FastThreadLocal<Set<ServerCnx>>(){

        @Override
        protected Set<ServerCnx> initialValue() throws Exception {
            return Collections.newSetFromMap(new IdentityHashMap());
        }
    };
    private static final byte[] emptyArray = new byte[0];
    private static final Gauge throttledConnections = (Gauge)((Gauge.Builder)((Gauge.Builder)Gauge.build().name("pulsar_broker_throttled_connections")).help("Counter of connections throttled because of per-connection limit")).register();
    private static final Gauge throttledConnectionsGlobal = (Gauge)((Gauge.Builder)((Gauge.Builder)Gauge.build().name("pulsar_broker_throttled_connections_global_limit")).help("Counter of connections throttled because of per-connection limit")).register();
    private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);

    public ServerCnx(PulsarService pulsar) {
        this(pulsar, null);
    }

    public ServerCnx(PulsarService pulsar, String listenerName) {
        super(pulsar.getBrokerService().getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
        this.service = pulsar.getBrokerService();
        this.schemaService = pulsar.getSchemaRegistryService();
        this.listenerName = listenerName;
        this.state = State.Start;
        ServiceConfiguration conf = pulsar.getConfiguration();
        this.producers = new ConcurrentLongHashMap(8, 1);
        this.consumers = new ConcurrentLongHashMap(8, 1);
        this.replicatorPrefix = conf.getReplicatorPrefix();
        this.maxNonPersistentPendingMessages = conf.getMaxConcurrentNonPersistentMessagePerConnection();
        this.proxyRoles = conf.getProxyRoles();
        this.authenticateOriginalAuthData = conf.isAuthenticateOriginalAuthData();
        this.schemaValidationEnforced = conf.isSchemaValidationEnforced();
        this.maxMessageSize = conf.getMaxMessageSize();
        this.maxPendingSendRequests = conf.getMaxPendingPublishRequestsPerConnection();
        this.resumeReadsThreshold = this.maxPendingSendRequests / 2;
        this.preciseDispatcherFlowControl = conf.isPreciseDispatcherFlowControl();
        this.preciseTopicPublishRateLimitingEnable = conf.isPreciseTopicPublishRateLimiterEnable();
        this.encryptionRequireOnProducer = conf.isEncryptionRequireOnProducer();
        this.maxPendingBytesPerThread = (long)conf.getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L / (long)conf.getNumIOThreads();
        this.resumeThresholdPendingBytesPerThread = this.maxPendingBytesPerThread / 2L;
        this.connectionController = new ConnectionController.DefaultConnectionController(conf);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ConnectionController.Sate sate = this.connectionController.increaseConnection(this.remoteAddress);
        if (!sate.equals((Object)ConnectionController.Sate.OK)) {
            ctx.channel().writeAndFlush(Commands.newError(-1L, ServerError.NotAllowedError, sate.equals((Object)ConnectionController.Sate.REACH_MAX_CONNECTION) ? "Reached the maximum number of connections" : "Reached the maximum number of connections on address" + this.remoteAddress));
            ctx.channel().close();
            return;
        }
        log.info("New connection from {}", (Object)this.remoteAddress);
        this.ctx = ctx;
        this.commandSender = new PulsarCommandSenderImpl(this.getBrokerService().getInterceptor(), this);
        this.service.getPulsarStats().recordConnectionCreate();
        cnxsPerThread.get().add(this);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        this.connectionController.decreaseConnection(ctx.channel().remoteAddress());
        this.isActive = false;
        log.info("Closed connection from {}", (Object)this.remoteAddress);
        BrokerInterceptor brokerInterceptor = this.getBrokerService().getInterceptor();
        if (brokerInterceptor != null) {
            brokerInterceptor.onConnectionClosed(this);
        }
        cnxsPerThread.get().remove(this);
        this.producers.forEach((__, producerFuture) -> {
            if (producerFuture.isDone() && !producerFuture.isCompletedExceptionally()) {
                Producer producer = producerFuture.getNow(null);
                producer.closeNow(true);
            }
        });
        this.consumers.forEach((__, consumerFuture) -> {
            if (!consumerFuture.isDone() || consumerFuture.isCompletedExceptionally()) {
                return;
            }
            Consumer consumer = consumerFuture.getNow(null);
            try {
                consumer.close();
            }
            catch (BrokerServiceException e) {
                log.warn("Consumer {} was already closed: {}", (Object)consumer, (Object)e);
            }
        });
        this.service.getPulsarStats().recordConnectionClose();
    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", (Object)ctx.channel().isWritable());
        }
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if (this.state != State.Failed) {
            log.warn("[{}] Got exception {}", (Object)this.remoteAddress, ClientCnx.isKnownException(cause) ? cause : ExceptionUtils.getStackTrace(cause));
            this.state = State.Failed;
            if (log.isDebugEnabled()) {
                log.debug("[{}] connect state change to : [{}]", (Object)this.remoteAddress, (Object)State.Failed.name());
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception: {}", (Object)this.remoteAddress, (Object)cause);
        }
        ctx.close();
    }

    private boolean invalidOriginalPrincipal(String originalPrincipal) {
        return this.service.isAuthenticationEnabled() && this.service.isAuthorizationEnabled() && this.proxyRoles.contains(this.authRole) && (StringUtils.isBlank(originalPrincipal) || this.proxyRoles.contains(originalPrincipal));
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, TopicOperation operation) {
        if (!this.service.isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(true);
        }
        CompletableFuture<Boolean> isProxyAuthorizedFuture = this.originalPrincipal != null ? this.service.getAuthorizationService().allowTopicOperationAsync(topicName, operation, this.originalPrincipal, this.getAuthenticationData()) : CompletableFuture.completedFuture(true);
        CompletableFuture<Boolean> isAuthorizedFuture = this.service.getAuthorizationService().allowTopicOperationAsync(topicName, operation, this.authRole, this.authenticationData);
        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
            if (!isProxyAuthorized.booleanValue()) {
                log.warn("OriginalRole {} is not authorized to perform operation {} on topic {}", new Object[]{this.originalPrincipal, operation, topicName});
            }
            if (!isAuthorized.booleanValue()) {
                log.warn("Role {} is not authorized to perform operation {} on topic {}", new Object[]{this.authRole, operation, topicName});
            }
            return isProxyAuthorized != false && isAuthorized != false;
        });
    }

    private CompletableFuture<Boolean> isTopicOperationAllowed(TopicName topicName, String subscriptionName, TopicOperation operation) {
        if (this.service.isAuthorizationEnabled()) {
            if (this.authenticationData == null) {
                this.authenticationData = new AuthenticationDataCommand("", subscriptionName);
            } else {
                this.authenticationData.setSubscription(subscriptionName);
            }
            if (this.originalAuthData != null) {
                this.originalAuthData.setSubscription(subscriptionName);
            }
            return this.isTopicOperationAllowed(topicName, operation);
        }
        return CompletableFuture.completedFuture(true);
    }

    @Override
    protected void handleLookup(CommandLookupTopic lookup) {
        TopicName topicName;
        String advertisedListenerName;
        long requestId = lookup.getRequestId();
        boolean authoritative = lookup.isAuthoritative();
        String string = advertisedListenerName = lookup.hasAdvertisedListenerName() ? lookup.getAdvertisedListenerName() : this.listenerName;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Lookup from {} for {}", new Object[]{lookup.getTopic(), this.remoteAddress, requestId});
        }
        if ((topicName = this.validateTopicName(lookup.getTopic(), requestId, lookup)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for lookup ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for lookup ", this.authRole, this.originalPrincipal, topicName});
                this.ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.AuthorizationError, "Valid Proxy Client role should be provided for lookup ", requestId));
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    TopicLookupBase.lookupTopicAsync(this.getBrokerService().pulsar(), topicName, authoritative, this.getPrincipal(), this.getAuthenticationData(), requestId, advertisedListenerName).handle((lookupResponse, ex) -> {
                        if (ex == null) {
                            this.ctx.writeAndFlush(lookupResponse);
                        } else {
                            log.warn("[{}] lookup failed with error {}, {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            this.ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady, ex.getMessage(), requestId));
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Lookup";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Lookup", this.getPrincipal(), topicName});
                    this.ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.AuthorizationError, "Proxy Client is not authorized to Lookup", requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                ServerCnx.logAuthException(this.remoteAddress, "lookup", this.getPrincipal(), Optional.of(topicName), ex);
                String msg = "Exception occurred while trying to authorize lookup";
                this.ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.AuthorizationError, "Exception occurred while trying to authorize lookup", requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId));
        }
    }

    @Override
    protected void handlePartitionMetadataRequest(CommandPartitionedTopicMetadata partitionMetadata) {
        TopicName topicName;
        long requestId = partitionMetadata.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", new Object[]{partitionMetadata.getTopic(), this.remoteAddress, requestId});
        }
        if ((topicName = this.validateTopicName(partitionMetadata.getTopic(), requestId, partitionMetadata)) == null) {
            return;
        }
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for getPartitionMetadataRequest ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", this.authRole, this.originalPrincipal, topicName});
                this.commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId);
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isTopicOperationAllowed(topicName, TopicOperation.LOOKUP).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    PersistentTopicsBase.unsafeGetPartitionedTopicMetadataAsync(this.getBrokerService().pulsar(), topicName).handle((metadata, ex) -> {
                        if (ex == null) {
                            int partitions = metadata.partitions;
                            this.commandSender.sendPartitionMetadataResponse(partitions, requestId);
                        } else if (ex instanceof PulsarClientException) {
                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", new Object[]{this.getRole(), this.remoteAddress, topicName, ex.getMessage()});
                            this.commandSender.sendPartitionMetadataResponse(ServerError.AuthorizationError, ex.getMessage(), requestId);
                        } else {
                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", new Object[]{this.remoteAddress, topicName, ex.getMessage(), ex});
                            ServerError error = ex instanceof RestException && ((RestException)ex).getResponse().getStatus() < 500 ? ServerError.MetadataError : ServerError.ServiceNotReady;
                            this.commandSender.sendPartitionMetadataResponse(error, ex.getMessage(), requestId);
                        }
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to Get Partition Metadata";
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Get Partition Metadata", this.getPrincipal(), topicName});
                    this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, "Proxy Client is not authorized to Get Partition Metadata", requestId));
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                ServerCnx.logAuthException(this.remoteAddress, "partition-metadata", this.getPrincipal(), Optional.of(topicName), ex);
                String msg = "Exception occurred while trying to authorize get Partition Metadata";
                this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.AuthorizationError, "Exception occurred while trying to authorize get Partition Metadata", requestId));
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)topicName);
            }
            this.commandSender.sendPartitionMetadataResponse(ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId);
        }
    }

    @Override
    protected void handleConsumerStats(CommandConsumerStats commandConsumerStats) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandConsumerStats call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandConsumerStats.getRequestId();
        long consumerId = commandConsumerStats.getConsumerId();
        CompletableFuture<Consumer> consumerFuture = this.consumers.get(consumerId);
        Consumer consumer = consumerFuture.getNow(null);
        ByteBuf msg = null;
        if (consumer == null) {
            log.error("Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", new Object[]{this.remoteAddress, requestId, consumerId});
            msg = Commands.newConsumerStatsResponse(ServerError.ConsumerNotFound, "Consumer " + consumerId + " not found", requestId);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", (Object)requestId, (Object)consumer);
            }
            msg = this.createConsumerStatsResponse(consumer, requestId);
        }
        this.ctx.writeAndFlush(msg);
    }

    ByteBuf createConsumerStatsResponse(Consumer consumer, long requestId) {
        ConsumerStatsImpl consumerStats = consumer.getStats();
        Subscription subscription = consumer.getSubscription();
        BaseCommand cmd = Commands.newConsumerStatsResponseCommand(ServerError.UnknownError, null, requestId);
        cmd.getConsumerStatsResponse().clearErrorCode().setRequestId(requestId).setMsgRateOut(consumerStats.msgRateOut).setMsgThroughputOut(consumerStats.msgThroughputOut).setMsgRateRedeliver(consumerStats.msgRateRedeliver).setConsumerName(consumerStats.consumerName).setAvailablePermits(consumerStats.availablePermits).setUnackedMessages(consumerStats.unackedMessages).setBlockedConsumerOnUnackedMsgs(consumerStats.blockedConsumerOnUnackedMsgs).setAddress(consumerStats.getAddress()).setConnectedSince(consumerStats.getConnectedSince()).setMsgBacklog(subscription.getNumberOfEntriesInBacklog(false)).setMsgRateExpired(subscription.getExpiredMessageRate()).setType(subscription.getTypeString());
        return Commands.serializeWithSize(cmd);
    }

    private void completeConnect(int clientProtoVersion, String clientVersion) {
        this.ctx.writeAndFlush(Commands.newConnected(clientProtoVersion, this.maxMessageSize));
        this.state = State.Connected;
        this.service.getPulsarStats().recordConnectionCreateSuccess();
        if (log.isDebugEnabled()) {
            log.debug("[{}] connect state change to : [{}]", (Object)this.remoteAddress, (Object)State.Connected.name());
        }
        this.setRemoteEndpointProtocolVersion(clientProtoVersion);
        if (StringUtils.isNotBlank(clientVersion) && !clientVersion.contains(" ")) {
            this.clientVersion = clientVersion.intern();
        }
    }

    private State doAuthentication(AuthData clientData, int clientProtocolVersion, String clientVersion) throws Exception {
        boolean useOriginalAuthState = this.originalAuthState != null;
        AuthenticationState authState = useOriginalAuthState ? this.originalAuthState : this.authState;
        String authRole = useOriginalAuthState ? this.originalPrincipal : this.authRole;
        AuthData brokerData = authState.authenticate(clientData);
        if (log.isDebugEnabled()) {
            log.debug("Authenticate using original auth state : {}, role = {}", (Object)useOriginalAuthState, (Object)authRole);
        }
        if (authState.isComplete()) {
            String newAuthRole = authState.getAuthRole();
            this.authenticationData = authState.getAuthDataSource();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Auth data refreshed for role={}", (Object)this.remoteAddress, (Object)this.authRole);
            }
            if (!useOriginalAuthState) {
                this.authRole = newAuthRole;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", new Object[]{this.remoteAddress, this.authMethod, this.authRole, this.originalPrincipal});
            }
            if (this.state != State.Connected) {
                this.completeConnect(clientProtocolVersion, clientVersion);
            } else if (!StringUtils.isEmpty(authRole)) {
                if (!authRole.equals(newAuthRole)) {
                    log.warn("[{}] Principal cannot change during an authentication refresh expected={} got={}", new Object[]{this.remoteAddress, authRole, newAuthRole});
                    this.ctx.close();
                } else {
                    log.info("[{}] Refreshed authentication credentials for role {}", (Object)this.remoteAddress, (Object)authRole);
                }
            }
            return State.Connected;
        }
        this.ctx.writeAndFlush(Commands.newAuthChallenge(this.authMethod, brokerData, clientProtocolVersion));
        if (log.isDebugEnabled()) {
            log.debug("[{}] Authentication in progress client by method {}.", (Object)this.remoteAddress, (Object)this.authMethod);
            log.debug("[{}] connect state change to : [{}]", (Object)this.remoteAddress, (Object)State.Connecting.name());
        }
        return State.Connecting;
    }

    public void refreshAuthenticationCredentials() {
        AuthenticationState authState;
        AuthenticationState authenticationState = authState = this.originalAuthState != null ? this.originalAuthState : this.authState;
        if (authState == null) {
            return;
        }
        if (this.getState() != State.Connected || !this.isActive) {
            return;
        }
        if (authState != null && !authState.isExpired()) {
            return;
        }
        if (this.originalPrincipal != null && this.originalAuthState == null) {
            log.info("[{}] Cannot revalidate user credential when using proxy and not forwarding the credentials. Closing connection", (Object)this.remoteAddress);
            return;
        }
        this.ctx.executor().execute(SafeRun.safeRun(() -> {
            log.info("[{}] Refreshing authentication credentials for originalPrincipal {} and authRole {}", new Object[]{this.remoteAddress, this.originalPrincipal, this.authRole});
            if (!this.supportsAuthenticationRefresh()) {
                log.warn("[{}] Closing connection because client doesn't support auth credentials refresh", (Object)this.remoteAddress);
                this.ctx.close();
                return;
            }
            if (this.pendingAuthChallengeResponse) {
                log.warn("[{}] Closing connection after timeout on refreshing auth credentials", (Object)this.remoteAddress);
                this.ctx.close();
                return;
            }
            try {
                AuthData brokerData = authState.refreshAuthentication();
                this.ctx.writeAndFlush(Commands.newAuthChallenge(this.authMethod, brokerData, this.getRemoteEndpointProtocolVersion()));
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Sent auth challenge to client to refresh credentials with method: {}.", (Object)this.remoteAddress, (Object)this.authMethod);
                }
                this.pendingAuthChallengeResponse = true;
            }
            catch (AuthenticationException e) {
                log.warn("[{}] Failed to refresh authentication: {}", (Object)this.remoteAddress, (Object)e);
                this.ctx.close();
            }
        }));
    }

    @Override
    protected void handleConnect(CommandConnect connect) {
        Preconditions.checkArgument(this.state == State.Start);
        if (log.isDebugEnabled()) {
            log.debug("Received CONNECT from {}, auth enabled: {}: has original principal = {}, original principal = {}", new Object[]{this.remoteAddress, this.service.isAuthenticationEnabled(), connect.hasOriginalPrincipal(), connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null});
        }
        String clientVersion = connect.getClientVersion();
        int clientProtocolVersion = connect.getProtocolVersion();
        this.features = new FeatureFlags();
        if (connect.hasFeatureFlags()) {
            this.features.copyFrom(connect.getFeatureFlags());
        }
        if (!this.service.isAuthenticationEnabled()) {
            this.completeConnect(clientProtocolVersion, clientVersion);
            return;
        }
        try {
            byte[] authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
            AuthData clientData = AuthData.of(authData);
            this.authMethod = connect.hasAuthMethodName() ? connect.getAuthMethodName() : (connect.hasAuthMethod() ? connect.getAuthMethod().name().substring(10).toLowerCase() : "none");
            this.authenticationProvider = this.getBrokerService().getAuthenticationService().getAuthenticationProvider(this.authMethod);
            if (this.authenticationProvider == null) {
                this.authRole = this.getBrokerService().getAuthenticationService().getAnonymousUserRole().orElseThrow(() -> new AuthenticationException("No anonymous role, and no authentication provider configured"));
                this.completeConnect(clientProtocolVersion, clientVersion);
                return;
            }
            ChannelHandler sslHandler = this.ctx.channel().pipeline().get("tls");
            SSLSession sslSession = null;
            if (sslHandler != null) {
                sslSession = ((SslHandler)sslHandler).engine().getSession();
            }
            this.authState = this.authenticationProvider.newAuthState(clientData, this.remoteAddress, sslSession);
            this.authenticationData = this.authState.getAuthDataSource();
            if (log.isDebugEnabled()) {
                log.debug("[{}] Authenticate role : {}", (Object)this.remoteAddress, this.authState != null ? this.authState.getAuthRole() : null);
            }
            this.state = this.doAuthentication(clientData, clientProtocolVersion, clientVersion);
            if (connect.hasOriginalPrincipal() && this.service.getPulsar().getConfig().isAuthenticateOriginalAuthData()) {
                String originalAuthMethod = connect.hasOriginalAuthMethod() ? connect.getOriginalAuthMethod() : "none";
                AuthenticationProvider originalAuthenticationProvider = this.getBrokerService().getAuthenticationService().getAuthenticationProvider(originalAuthMethod);
                if (originalAuthenticationProvider == null) {
                    throw new AuthenticationException(String.format("Can't find AuthenticationProvider for original role using auth method [%s] is not available", originalAuthMethod));
                }
                this.originalAuthState = originalAuthenticationProvider.newAuthState(AuthData.of(connect.getOriginalAuthData().getBytes()), this.remoteAddress, sslSession);
                this.originalAuthData = this.originalAuthState.getAuthDataSource();
                this.originalPrincipal = this.originalAuthState.getAuthRole();
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authenticate original role : {}", (Object)this.remoteAddress, (Object)this.originalPrincipal);
                }
            } else {
                String string = this.originalPrincipal = connect.hasOriginalPrincipal() ? connect.getOriginalPrincipal() : null;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Authenticate original role (forwarded from proxy): {}", (Object)this.remoteAddress, (Object)this.originalPrincipal);
                }
            }
        }
        catch (Exception e) {
            this.service.getPulsarStats().recordConnectionCreateFail();
            ServerCnx.logAuthException(this.remoteAddress, "connect", this.getPrincipal(), Optional.empty(), e);
            String msg = "Unable to authenticate";
            this.ctx.writeAndFlush(Commands.newError(-1L, ServerError.AuthenticationError, msg));
            this.close();
        }
    }

    @Override
    protected void handleAuthResponse(CommandAuthResponse authResponse) {
        Preconditions.checkArgument(authResponse.hasResponse());
        Preconditions.checkArgument(authResponse.getResponse().hasAuthData() && authResponse.getResponse().hasAuthMethodName());
        this.pendingAuthChallengeResponse = false;
        if (log.isDebugEnabled()) {
            log.debug("Received AuthResponse from {}, auth method: {}", (Object)this.remoteAddress, (Object)authResponse.getResponse().getAuthMethodName());
        }
        try {
            AuthData clientData = AuthData.of(authResponse.getResponse().getAuthData());
            this.doAuthentication(clientData, authResponse.getProtocolVersion(), authResponse.getClientVersion());
        }
        catch (AuthenticationException e) {
            this.service.getPulsarStats().recordConnectionCreateFail();
            log.warn("[{}] Authentication failed: {} ", (Object)this.remoteAddress, (Object)e.getMessage());
            this.ctx.writeAndFlush(Commands.newError(-1L, ServerError.AuthenticationError, e.getMessage()));
            this.close();
        }
        catch (Exception e) {
            this.service.getPulsarStats().recordConnectionCreateFail();
            String msg = "Unable to handleAuthResponse";
            log.warn("[{}] {} ", new Object[]{this.remoteAddress, msg, e});
            this.ctx.writeAndFlush(Commands.newError(-1L, ServerError.UnknownError, msg));
            this.close();
        }
    }

    @Override
    protected void handleSubscribe(CommandSubscribe subscribe) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = subscribe.getRequestId();
        long consumerId = subscribe.getConsumerId();
        TopicName topicName = this.validateTopicName(subscribe.getTopic(), requestId, subscribe);
        if (topicName == null) {
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Handle subscribe command: auth role = {}, original auth role = {}", new Object[]{this.remoteAddress, this.authRole, this.originalPrincipal});
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while subscribing ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while subscribing ", this.authRole, this.originalPrincipal, topicName});
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided while subscribing ");
            return;
        }
        String subscriptionName = subscribe.getSubscription();
        CommandSubscribe.SubType subType = subscribe.getSubType();
        String consumerName = subscribe.hasConsumerName() ? subscribe.getConsumerName() : "";
        boolean isDurable = subscribe.isDurable();
        BatchMessageIdImpl startMessageId = subscribe.hasStartMessageId() ? new BatchMessageIdImpl(subscribe.getStartMessageId().getLedgerId(), subscribe.getStartMessageId().getEntryId(), subscribe.getStartMessageId().getPartition(), subscribe.getStartMessageId().getBatchIndex()) : null;
        int priorityLevel = subscribe.hasPriorityLevel() ? subscribe.getPriorityLevel() : 0;
        boolean readCompacted = subscribe.hasReadCompacted() && subscribe.isReadCompacted();
        Map<String, String> metadata = CommandUtils.metadataFromCommand(subscribe);
        CommandSubscribe.InitialPosition initialPosition = subscribe.getInitialPosition();
        long startMessageRollbackDurationSec = subscribe.hasStartMessageRollbackDurationSec() ? subscribe.getStartMessageRollbackDurationSec() : -1L;
        SchemaData schema = subscribe.hasSchema() ? this.getSchema(subscribe.getSchema()) : null;
        boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.isReplicateSubscriptionState();
        boolean forceTopicCreation = subscribe.isForceTopicCreation();
        KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) : emptyKeySharedMeta;
        CompletableFuture<Boolean> isAuthorizedFuture = this.isTopicOperationAllowed(topicName, subscriptionName, TopicOperation.CONSUME);
        ((CompletableFuture)isAuthorizedFuture.thenApply(isAuthorized -> {
            if (isAuthorized.booleanValue()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Client is authorized to subscribe with role {}", (Object)this.remoteAddress, (Object)this.getPrincipal());
                }
                log.info("[{}] Subscribing on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                try {
                    Metadata.validateMetadata(metadata);
                }
                catch (IllegalArgumentException iae) {
                    String msg = iae.getMessage();
                    this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
                    return null;
                }
                CompletableFuture consumerFuture = new CompletableFuture();
                CompletableFuture existingConsumerFuture = this.consumers.putIfAbsent(consumerId, consumerFuture);
                if (existingConsumerFuture != null) {
                    if (existingConsumerFuture.isDone() && !existingConsumerFuture.isCompletedExceptionally()) {
                        Consumer consumer2 = existingConsumerFuture.getNow(null);
                        log.info("[{}] Consumer with the same id is already created: consumerId={}, consumer={}", new Object[]{this.remoteAddress, consumerId, consumer2});
                        this.commandSender.sendSuccessResponse(requestId);
                        return null;
                    }
                    log.warn("[{}][{}][{}] Consumer with id is already present on the connection, consumerId={}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId});
                    ServerError error = null;
                    if (!existingConsumerFuture.isDone()) {
                        error = ServerError.ServiceNotReady;
                    } else {
                        error = this.getErrorCode(existingConsumerFuture);
                        this.consumers.remove(consumerId, existingConsumerFuture);
                    }
                    this.commandSender.sendErrorResponse(requestId, error, "Consumer is already present on the connection");
                    return null;
                }
                boolean createTopicIfDoesNotExist = forceTopicCreation && this.service.isAllowAutoTopicCreation(topicName.toString());
                ((CompletableFuture)((CompletableFuture)this.service.getTopic(topicName.toString(), createTopicIfDoesNotExist).thenCompose(optTopic -> {
                    boolean rejectSubscriptionIfDoesNotExist;
                    if (!optTopic.isPresent()) {
                        return FutureUtil.failedFuture(new BrokerServiceException.TopicNotFoundException("Topic " + topicName + " does not exist"));
                    }
                    Topic topic = (Topic)optTopic.get();
                    boolean bl = rejectSubscriptionIfDoesNotExist = isDurable && !this.service.isAllowAutoSubscriptionCreation(topicName.toString()) && !topic.getSubscriptions().containsKey(subscriptionName);
                    if (rejectSubscriptionIfDoesNotExist) {
                        return FutureUtil.failedFuture(new BrokerServiceException.SubscriptionNotFoundException("Subscription does not exist"));
                    }
                    if (schema != null) {
                        return topic.addSchemaIfIdleOrCheckCompatible(schema).thenCompose(v -> topic.subscribe(this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta));
                    }
                    return topic.subscribe(this, subscriptionName, consumerId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, initialPosition, startMessageRollbackDurationSec, isReplicated, keySharedMeta);
                })).thenAccept(consumer -> {
                    if (consumerFuture.complete(consumer)) {
                        log.info("[{}] Created subscription on topic {} / {}", new Object[]{this.remoteAddress, topicName, subscriptionName});
                        this.commandSender.sendSuccessResponse(requestId);
                    } else {
                        try {
                            consumer.close();
                            log.info("[{}] Cleared consumer created after timeout on client side {}", (Object)this.remoteAddress, consumer);
                        }
                        catch (BrokerServiceException e) {
                            log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", new Object[]{this.remoteAddress, consumer, e.getMessage()});
                        }
                        this.consumers.remove(consumerId, consumerFuture);
                    }
                })).exceptionally(exception -> {
                    if (exception.getCause() instanceof BrokerServiceException.ConsumerBusyException) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}", new Object[]{this.remoteAddress, topicName, subscriptionName, exception.getCause().getMessage()});
                        }
                    } else if (exception.getCause() instanceof BrokerServiceException) {
                        log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId, exception.getCause().getMessage()});
                    } else {
                        log.warn("[{}][{}][{}] Failed to create consumer: consumerId={}, {}", new Object[]{this.remoteAddress, topicName, subscriptionName, consumerId, exception.getCause().getMessage(), exception});
                    }
                    if (consumerFuture.completeExceptionally((Throwable)exception)) {
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), exception.getCause().getMessage());
                    }
                    this.consumers.remove(consumerId, consumerFuture);
                    return null;
                });
            } else {
                String msg = "Client is not authorized to subscribe";
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.getPrincipal()});
                this.ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
            }
            return null;
        })).exceptionally(ex -> {
            ServerCnx.logAuthException(this.remoteAddress, "subscribe", this.getPrincipal(), Optional.of(topicName), ex);
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }

    private SchemaData getSchema(Schema protocolSchema) {
        return SchemaData.builder().data(protocolSchema.getSchemaData()).isDeleted(false).timestamp(System.currentTimeMillis()).user(Strings.nullToEmpty(this.originalPrincipal)).type(Commands.getSchemaType(protocolSchema.getType())).props(protocolSchema.getPropertiesList().stream().collect(Collectors.toMap(KeyValue::getKey, KeyValue::getValue))).build();
    }

    @Override
    protected void handleProducer(CommandProducer cmdProducer) {
        Preconditions.checkArgument(this.state == State.Connected);
        long producerId = cmdProducer.getProducerId();
        long requestId = cmdProducer.getRequestId();
        String producerName = cmdProducer.hasProducerName() ? cmdProducer.getProducerName() : this.service.generateUniqueProducerName();
        long epoch = cmdProducer.getEpoch();
        boolean userProvidedProducerName = cmdProducer.isUserProvidedProducerName();
        boolean isEncrypted = cmdProducer.isEncrypted();
        Map<String, String> metadata = CommandUtils.metadataFromCommand(cmdProducer);
        SchemaData schema = cmdProducer.hasSchema() ? this.getSchema(cmdProducer.getSchema()) : null;
        ProducerAccessMode producerAccessMode = cmdProducer.getProducerAccessMode();
        Optional topicEpoch = cmdProducer.hasTopicEpoch() ? Optional.of(cmdProducer.getTopicEpoch()) : Optional.empty();
        boolean isTxnEnabled = cmdProducer.isTxnEnabled();
        TopicName topicName = this.validateTopicName(cmdProducer.getTopic(), requestId, cmdProducer);
        if (topicName == null) {
            return;
        }
        if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
            String msg = "Valid Proxy Client role should be provided while creating producer ";
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while creating producer ", this.authRole, this.originalPrincipal, topicName});
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided while creating producer ");
            return;
        }
        CompletableFuture<Boolean> isAuthorizedFuture = this.isTopicOperationAllowed(topicName, TopicOperation.PRODUCE);
        ((CompletableFuture)isAuthorizedFuture.thenApply(isAuthorized -> {
            if (isAuthorized.booleanValue()) {
                CompletableFuture producerFuture;
                CompletableFuture existingProducerFuture;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Client is authorized to Produce with role {}", (Object)this.remoteAddress, (Object)this.getPrincipal());
                }
                if ((existingProducerFuture = this.producers.putIfAbsent(producerId, producerFuture = new CompletableFuture())) != null) {
                    if (existingProducerFuture.isDone() && !existingProducerFuture.isCompletedExceptionally()) {
                        Producer producer = existingProducerFuture.getNow(null);
                        log.info("[{}] Producer with the same id is already created: producerId={}, producer={}", new Object[]{this.remoteAddress, producerId, producer});
                        this.commandSender.sendProducerSuccessResponse(requestId, producer.getProducerName(), producer.getSchemaVersion());
                        return null;
                    }
                    ServerError error = null;
                    if (!existingProducerFuture.isDone()) {
                        error = ServerError.ServiceNotReady;
                    } else {
                        error = this.getErrorCode(existingProducerFuture);
                        this.producers.remove(producerId, existingProducerFuture);
                    }
                    log.warn("[{}][{}] Producer with id is already present on the connection, producerId={}", new Object[]{this.remoteAddress, topicName, producerId});
                    this.commandSender.sendErrorResponse(requestId, error, "Producer is already present on the connection");
                    return null;
                }
                log.info("[{}][{}] Creating producer. producerId={}", new Object[]{this.remoteAddress, topicName, producerId});
                ((CompletableFuture)this.service.getOrCreateTopic(topicName.toString()).thenAccept(topic -> {
                    for (BacklogQuota.BacklogQuotaType backlogQuotaType : BacklogQuota.BacklogQuotaType.values()) {
                        if (!topic.isBacklogQuotaExceeded(producerName, backlogQuotaType)) continue;
                        IllegalStateException illegalStateException = new IllegalStateException("Cannot create producer on topic with backlog quota exceeded");
                        BacklogQuota.RetentionPolicy retentionPolicy = topic.getBacklogQuota(backlogQuotaType).getPolicy();
                        if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage());
                        } else if (retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception) {
                            this.commandSender.sendErrorResponse(requestId, ServerError.ProducerBlockedQuotaExceededException, illegalStateException.getMessage());
                        }
                        producerFuture.completeExceptionally(illegalStateException);
                        this.producers.remove(producerId, producerFuture);
                        return;
                    }
                    if ((topic.isEncryptionRequired() || this.encryptionRequireOnProducer) && !isEncrypted) {
                        String msg = String.format("Encryption is required in %s", topicName);
                        log.warn("[{}] {}", (Object)this.remoteAddress, (Object)msg);
                        this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, msg);
                        this.producers.remove(producerId, producerFuture);
                        return;
                    }
                    this.disableTcpNoDelayIfNeeded(topicName.toString(), producerName);
                    CompletableFuture<SchemaVersion> schemaVersionFuture = this.tryAddSchema((Topic)topic, schema);
                    schemaVersionFuture.exceptionally(exception -> {
                        String message = exception.getMessage();
                        if (exception.getCause() != null) {
                            message = message + " caused by " + exception.getCause();
                        }
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), message);
                        this.producers.remove(producerId, producerFuture);
                        return null;
                    });
                    schemaVersionFuture.thenAccept(schemaVersion -> ((CompletableFuture)topic.checkIfTransactionBufferRecoverCompletely(isTxnEnabled).thenAccept(future -> this.buildProducerAndAddTopic((Topic)topic, producerId, producerName, requestId, isEncrypted, metadata, (SchemaVersion)schemaVersion, epoch, userProvidedProducerName, topicName, producerAccessMode, topicEpoch, producerFuture))).exceptionally(exception -> {
                        Throwable cause = exception.getCause();
                        log.error("producerId {}, requestId {} : TransactionBuffer recover failed", new Object[]{producerId, requestId, exception});
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.ServiceUnitNotReadyException.getClientErrorCode(cause), cause.getMessage());
                        return null;
                    }));
                })).exceptionally(exception -> {
                    Throwable cause = exception.getCause();
                    if (cause instanceof NoSuchElementException) {
                        cause = new BrokerServiceException.TopicNotFoundException("Topic Not Found.");
                    }
                    if (!Exceptions.areExceptionsPresentInChain(cause, BrokerServiceException.ServiceUnitNotReadyException.class, ManagedLedgerException.class)) {
                        log.error("[{}] Failed to create topic {}, producerId={}", new Object[]{this.remoteAddress, topicName, producerId, exception});
                    }
                    if (producerFuture.completeExceptionally((Throwable)exception)) {
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(cause), cause.getMessage());
                    }
                    this.producers.remove(producerId, producerFuture);
                    return null;
                });
            } else {
                String msg = "Client is not authorized to Produce";
                log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, msg, this.getPrincipal()});
                this.ctx.writeAndFlush(Commands.newError(requestId, ServerError.AuthorizationError, msg));
            }
            return null;
        })).exceptionally(ex -> {
            ServerCnx.logAuthException(this.remoteAddress, "producer", this.getPrincipal(), Optional.of(topicName), ex);
            this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, ex.getMessage());
            return null;
        });
    }

    private void buildProducerAndAddTopic(Topic topic, long producerId, String producerName, long requestId, boolean isEncrypted, Map<String, String> metadata, SchemaVersion schemaVersion, long epoch, boolean userProvidedProducerName, TopicName topicName, ProducerAccessMode producerAccessMode, Optional<Long> topicEpoch, CompletableFuture<Producer> producerFuture) {
        CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<Void>();
        Producer producer = new Producer(topic, this, producerId, producerName, this.getPrincipal(), isEncrypted, metadata, schemaVersion, epoch, userProvidedProducerName, producerAccessMode, topicEpoch);
        ((CompletableFuture)topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
            if (this.isActive()) {
                if (producerFuture.complete(producer)) {
                    log.info("[{}] Created new producer: {}", (Object)this.remoteAddress, (Object)producer);
                    this.commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), (Optional<Long>)newTopicEpoch, true);
                    return;
                }
                producer.closeNow(true);
                log.info("[{}] Cleared producer created after timeout on client side {}", (Object)this.remoteAddress, (Object)producer);
            } else {
                producer.closeNow(true);
                log.info("[{}] Cleared producer created after connection was closed: {}", (Object)this.remoteAddress, (Object)producer);
                producerFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));
            }
            this.producers.remove(producerId, producerFuture);
        })).exceptionally(ex -> {
            log.error("[{}] Failed to add producer to topic {}: producerId={}, {}", new Object[]{this.remoteAddress, topicName, producerId, ex.getMessage()});
            producer.closeNow(true);
            if (producerFuture.completeExceptionally((Throwable)ex)) {
                this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
            }
            return null;
        });
        producerQueuedFuture.thenRun(() -> {
            if (this.isActive()) {
                log.info("[{}] Producer is waiting in queue: {}", (Object)this.remoteAddress, (Object)producer);
                this.commandSender.sendProducerSuccessResponse(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion(), Optional.empty(), false);
            }
        });
    }

    @Override
    protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Producer> producerFuture = this.producers.get(send.getProducerId());
        if (producerFuture == null || !producerFuture.isDone() || producerFuture.isCompletedExceptionally()) {
            log.warn("[{}] Producer had already been closed: {}", (Object)this.remoteAddress, (Object)send.getProducerId());
            return;
        }
        Producer producer = producerFuture.getNow(null);
        if (log.isDebugEnabled()) {
            this.printSendCommandDebug(send, headersAndPayload);
        }
        if (producer.isNonPersistentTopic()) {
            if (this.nonPersistentPendingMessages > this.maxNonPersistentPendingMessages) {
                long producerId = send.getProducerId();
                long sequenceId = send.getSequenceId();
                long highestSequenceId = send.getHighestSequenceId();
                this.service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), (SafeRunnable)SafeRun.safeRun(() -> this.commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1L, -1L)));
                producer.recordMessageDrop(send.getNumMessages());
                return;
            }
            ++this.nonPersistentPendingMessages;
        }
        this.startSendOperation(producer, headersAndPayload.readableBytes(), send.getNumMessages());
        if (send.hasTxnidMostBits() && send.hasTxnidLeastBits()) {
            TxnID txnID = new TxnID(send.getTxnidMostBits(), send.getTxnidLeastBits());
            producer.publishTxnMessage(txnID, producer.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
            return;
        }
        if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), send.getHighestSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
        } else {
            producer.publishMessage(send.getProducerId(), send.getSequenceId(), headersAndPayload, send.getNumMessages(), send.isIsChunk(), send.isMarker());
        }
    }

    private void printSendCommandDebug(CommandSend send, ByteBuf headersAndPayload) {
        headersAndPayload.markReaderIndex();
        MessageMetadata msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
        headersAndPayload.resetReaderIndex();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}, partition key is: {}, ordering key is {}", new Object[]{this.remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(), msgMetadata.getSequenceId(), headersAndPayload.readableBytes(), msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null, msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null});
        }
    }

    @Override
    protected void handleAck(CommandAck ack) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Consumer> consumerFuture = this.consumers.get(ack.getConsumerId());
        boolean hasRequestId = ack.hasRequestId();
        long requestId = hasRequestId ? ack.getRequestId() : 0L;
        long consumerId = ack.getConsumerId();
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((CompletableFuture)((Consumer)consumerFuture.getNow(null)).messageAcked(ack).thenRun(() -> {
                if (hasRequestId) {
                    this.ctx.writeAndFlush(Commands.newAckResponse(requestId, null, null, consumerId));
                }
            })).exceptionally(e -> {
                if (hasRequestId) {
                    this.ctx.writeAndFlush(Commands.newAckResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage(), consumerId));
                }
                return null;
            });
        }
    }

    @Override
    protected void handleFlow(CommandFlow flow) {
        CompletableFuture<Consumer> consumerFuture;
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received flow from consumer {} permits: {}", new Object[]{this.remoteAddress, flow.getConsumerId(), flow.getMessagePermits()});
        }
        if ((consumerFuture = this.consumers.get(flow.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (consumer != null) {
                consumer.flowPermits(flow.getMessagePermits());
            } else {
                log.info("[{}] Couldn't find consumer {}", (Object)this.remoteAddress, (Object)flow.getConsumerId());
            }
        }
    }

    @Override
    protected void handleRedeliverUnacknowledged(CommandRedeliverUnacknowledgedMessages redeliver) {
        CompletableFuture<Consumer> consumerFuture;
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Resend Command from consumer {} ", (Object)this.remoteAddress, (Object)redeliver.getConsumerId());
        }
        if ((consumerFuture = this.consumers.get(redeliver.getConsumerId())) != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            if (redeliver.getMessageIdsCount() > 0 && Subscription.isIndividualAckMode(consumer.subType())) {
                consumer.redeliverUnacknowledgedMessages(redeliver.getMessageIdsList());
            } else {
                consumer.redeliverUnacknowledgedMessages();
            }
        }
    }

    @Override
    protected void handleUnsubscribe(CommandUnsubscribe unsubscribe) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Consumer> consumerFuture = this.consumers.get(unsubscribe.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            ((Consumer)consumerFuture.getNow(null)).doUnsubscribe(unsubscribe.getRequestId());
        } else {
            this.commandSender.sendErrorResponse(unsubscribe.getRequestId(), ServerError.MetadataError, "Consumer not found");
        }
    }

    @Override
    protected void handleSeek(CommandSeek seek) {
        boolean consumerCreated;
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = seek.getRequestId();
        CompletableFuture<Consumer> consumerFuture = this.consumers.get(seek.getConsumerId());
        if (!seek.hasMessageId() && !seek.hasMessagePublishTime()) {
            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Message id and message publish time were not present");
            return;
        }
        boolean bl = consumerCreated = consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally();
        if (consumerCreated && seek.hasMessageId()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            MessageIdData msgIdData = seek.getMessageId();
            long[] ackSet = null;
            if (msgIdData.getAckSetsCount() > 0) {
                ackSet = new long[msgIdData.getAckSetsCount()];
                for (int i = 0; i < ackSet.length; ++i) {
                    ackSet[i] = msgIdData.getAckSetAt(i);
                }
            }
            PositionImpl position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId(), ackSet);
            ((CompletableFuture)subscription.resetCursor(position).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to message id {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), position});
                this.commandSender.sendSuccessResponse(requestId);
            })).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Error when resetting subscription: " + ex.getCause().getMessage());
                return null;
            });
        } else if (consumerCreated && seek.hasMessagePublishTime()) {
            Consumer consumer = consumerFuture.getNow(null);
            Subscription subscription = consumer.getSubscription();
            long timestamp = seek.getMessagePublishTime();
            ((CompletableFuture)subscription.resetCursor(timestamp).thenRun(() -> {
                log.info("[{}] [{}][{}] Reset subscription to publish time {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), timestamp});
                this.commandSender.sendSuccessResponse(requestId);
            })).exceptionally(ex -> {
                log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, ex.getMessage(), ex});
                this.commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Reset subscription to publish time error: " + ex.getCause().getMessage());
                return null;
            });
        } else {
            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
        }
    }

    public boolean equals(Object o) {
        if (this == o) {
            return true;
        }
        if (o == null || this.getClass() != o.getClass()) {
            return false;
        }
        ServerCnx other = (ServerCnx)o;
        return Objects.equals(this.ctx().channel().id(), other.ctx().channel().id());
    }

    public int hashCode() {
        return Objects.hash(this.ctx().channel().id());
    }

    @Override
    protected void handleCloseProducer(CommandCloseProducer closeProducer) {
        Preconditions.checkArgument(this.state == State.Connected);
        long producerId = closeProducer.getProducerId();
        long requestId = closeProducer.getRequestId();
        CompletableFuture<Producer> producerFuture = this.producers.get(producerId);
        if (producerFuture == null) {
            log.warn("[{}] Producer was not registered on the connection. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendErrorResponse(requestId, ServerError.UnknownError, "Producer was not registered on the connection");
            return;
        }
        if (!producerFuture.isDone() && producerFuture.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
            log.info("[{}] Closed producer before its creation was completed. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, producerFuture);
            return;
        }
        if (producerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed producer that already failed to be created. producerId={}", (Object)this.remoteAddress, (Object)producerId);
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, producerFuture);
            return;
        }
        Producer producer = producerFuture.getNow(null);
        log.info("[{}][{}] Closing producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, producerId});
        producer.close(true).thenAccept(v -> {
            log.info("[{}][{}] Closed producer on cnx {}. producerId={}", new Object[]{producer.getTopic(), producer.getProducerName(), this.remoteAddress, producerId});
            this.commandSender.sendSuccessResponse(requestId);
            this.producers.remove(producerId, producerFuture);
        });
    }

    @Override
    protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
        Preconditions.checkArgument(this.state == State.Connected);
        log.info("[{}] Closing consumer: consumerId={}", (Object)this.remoteAddress, (Object)closeConsumer.getConsumerId());
        long requestId = closeConsumer.getRequestId();
        long consumerId = closeConsumer.getConsumerId();
        CompletableFuture<Consumer> consumerFuture = this.consumers.get(consumerId);
        if (consumerFuture == null) {
            log.warn("[{}] Consumer was not registered on the connection: consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendErrorResponse(requestId, ServerError.MetadataError, "Consumer not found");
            return;
        }
        if (!consumerFuture.isDone() && consumerFuture.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
            log.info("[{}] Closed consumer before its creation was completed. consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        if (consumerFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed consumer that already failed to be created. consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
            this.commandSender.sendSuccessResponse(requestId);
            return;
        }
        Consumer consumer = consumerFuture.getNow(null);
        try {
            consumer.close();
            this.consumers.remove(consumerId, consumerFuture);
            this.commandSender.sendSuccessResponse(requestId);
            log.info("[{}] Closed consumer, consumerId={}", (Object)this.remoteAddress, (Object)consumerId);
        }
        catch (BrokerServiceException e) {
            log.warn("[{]] Error closing consumer {} : {}", new Object[]{this.remoteAddress, consumer, e});
            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
        }
    }

    @Override
    protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Consumer> consumerFuture = this.consumers.get(getLastMessageId.getConsumerId());
        if (consumerFuture != null && consumerFuture.isDone() && !consumerFuture.isCompletedExceptionally()) {
            Consumer consumer = consumerFuture.getNow(null);
            long requestId = getLastMessageId.getRequestId();
            Topic topic = consumer.getSubscription().getTopic();
            Position lastPosition = topic.getLastPosition();
            int partitionIndex = TopicName.getPartitionIndex(topic.getName());
            Position markDeletePosition = null;
            if (consumer.getSubscription() instanceof PersistentSubscription) {
                markDeletePosition = ((PersistentSubscription)consumer.getSubscription()).getCursor().getMarkDeletedPosition();
            }
            this.getLargestBatchIndexWhenPossible(topic, (PositionImpl)lastPosition, (PositionImpl)markDeletePosition, partitionIndex, requestId, consumer.getSubscription().getName());
        } else {
            this.ctx.writeAndFlush(Commands.newError(getLastMessageId.getRequestId(), ServerError.MetadataError, "Consumer not found"));
        }
    }

    private void getLargestBatchIndexWhenPossible(Topic topic, PositionImpl lastPosition, PositionImpl markDeletePosition, int partitionIndex, long requestId, String subscriptionName) {
        PersistentTopic persistentTopic = (PersistentTopic)topic;
        ManagedLedgerImpl ml = (ManagedLedgerImpl)persistentTopic.getManagedLedger();
        if (lastPosition.getEntryId() == -1L) {
            this.handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex, markDeletePosition);
            return;
        }
        final CompletableFuture entryFuture = new CompletableFuture();
        ml.asyncReadEntry(lastPosition, new AsyncCallbacks.ReadEntryCallback(){

            @Override
            public void readEntryComplete(Entry entry, Object ctx) {
                entryFuture.complete(entry);
            }

            @Override
            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                entryFuture.completeExceptionally(exception);
            }
        }, null);
        CompletionStage batchSizeFuture = entryFuture.thenApply(entry -> {
            MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
            int batchSize = metadata.getNumMessagesInBatch();
            entry.release();
            return metadata.hasNumMessagesInBatch() ? batchSize : -1;
        });
        ((CompletableFuture)batchSizeFuture).whenComplete((batchSize, e) -> {
            if (e != null) {
                if (e.getCause() instanceof ManagedLedgerException.NonRecoverableLedgerException) {
                    this.handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, partitionIndex, markDeletePosition);
                } else {
                    this.ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Failed to get batch size for entry " + e.getMessage()));
                }
            } else {
                int largestBatchIndex;
                int n = largestBatchIndex = batchSize > 0 ? batchSize - 1 : -1;
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", new Object[]{this.remoteAddress, topic.getName(), subscriptionName, lastPosition, partitionIndex});
                }
                this.ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, lastPosition.getLedgerId(), lastPosition.getEntryId(), partitionIndex, largestBatchIndex, markDeletePosition != null ? markDeletePosition.getLedgerId() : -1L, markDeletePosition != null ? markDeletePosition.getEntryId() : -1L));
            }
        });
    }

    private void handleLastMessageIdFromCompactedLedger(PersistentTopic persistentTopic, long requestId, int partitionIndex, PositionImpl markDeletePosition) {
        ((CompletableFuture)persistentTopic.getCompactedTopic().readLastEntryOfCompactedLedger().thenAccept(entry -> {
            if (entry != null) {
                MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
                int bs = metadata.getNumMessagesInBatch();
                int largestBatchIndex = bs > 0 ? bs - 1 : -1;
                this.ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, entry.getLedgerId(), entry.getEntryId(), partitionIndex, largestBatchIndex, markDeletePosition != null ? markDeletePosition.getLedgerId() : -1L, markDeletePosition != null ? markDeletePosition.getEntryId() : -1L));
                entry.release();
            } else {
                this.ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, -1L, -1L, partitionIndex, -1, markDeletePosition != null ? markDeletePosition.getLedgerId() : -1L, markDeletePosition != null ? markDeletePosition.getEntryId() : -1L));
            }
        })).exceptionally(ex -> {
            this.ctx.writeAndFlush(Commands.newError(requestId, ServerError.MetadataError, "Failed to read last entry of the compacted Ledger " + ex.getCause().getMessage()));
            return null;
        });
    }

    private CompletableFuture<Boolean> isNamespaceOperationAllowed(NamespaceName namespaceName, NamespaceOperation operation) {
        if (!this.service.isAuthorizationEnabled()) {
            return CompletableFuture.completedFuture(true);
        }
        CompletableFuture<Boolean> isProxyAuthorizedFuture = this.originalPrincipal != null ? this.service.getAuthorizationService().allowNamespaceOperationAsync(namespaceName, operation, this.originalPrincipal, this.getAuthenticationData()) : CompletableFuture.completedFuture(true);
        CompletableFuture<Boolean> isAuthorizedFuture = this.service.getAuthorizationService().allowNamespaceOperationAsync(namespaceName, operation, this.authRole, this.authenticationData);
        return isProxyAuthorizedFuture.thenCombine(isAuthorizedFuture, (isProxyAuthorized, isAuthorized) -> {
            if (!isProxyAuthorized.booleanValue()) {
                log.warn("OriginalRole {} is not authorized to perform operation {} on namespace {}", new Object[]{this.originalPrincipal, operation, namespaceName});
            }
            if (!isAuthorized.booleanValue()) {
                log.warn("Role {} is not authorized to perform operation {} on namespace {}", new Object[]{this.authRole, operation, namespaceName});
            }
            return isProxyAuthorized != false && isAuthorized != false;
        });
    }

    @Override
    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        String namespace = commandGetTopicsOfNamespace.getNamespace();
        CommandGetTopicsOfNamespace.Mode mode = commandGetTopicsOfNamespace.getMode();
        NamespaceName namespaceName = NamespaceName.get(namespace);
        Semaphore lookupSemaphore = this.service.getLookupRequestSemaphore();
        if (lookupSemaphore.tryAcquire()) {
            if (this.invalidOriginalPrincipal(this.originalPrincipal)) {
                String msg = "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ";
                log.warn("[{}] {} with role {} and proxyClientAuthRole {} on namespace {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ", this.authRole, this.originalPrincipal, namespaceName});
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Valid Proxy Client role should be provided for getTopicsOfNamespaceRequest ");
                lookupSemaphore.release();
                return;
            }
            ((CompletableFuture)this.isNamespaceOperationAllowed(namespaceName, NamespaceOperation.GET_TOPICS).thenApply(isAuthorized -> {
                if (isAuthorized.booleanValue()) {
                    ((CompletableFuture)this.getBrokerService().pulsar().getNamespaceService().getListOfTopics(namespaceName, mode).thenAccept(topics -> {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", new Object[]{this.remoteAddress, namespace, requestId, topics.size()});
                        }
                        this.commandSender.sendGetTopicsOfNamespaceResponse((List<String>)topics, requestId);
                        lookupSemaphore.release();
                    })).exceptionally(ex -> {
                        log.warn("[{}] Error GetTopicsOfNamespace for namespace [//{}] by {}", new Object[]{this.remoteAddress, namespace, requestId});
                        this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(new BrokerServiceException.ServerMetadataException((Throwable)ex)), ex.getMessage());
                        lookupSemaphore.release();
                        return null;
                    });
                } else {
                    String msg = "Proxy Client is not authorized to GetTopicsOfNamespace";
                    log.warn("[{}] {} with role {} on namespace {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to GetTopicsOfNamespace", this.getPrincipal(), namespaceName});
                    this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Proxy Client is not authorized to GetTopicsOfNamespace");
                    lookupSemaphore.release();
                }
                return null;
            })).exceptionally(ex -> {
                ServerCnx.logNamespaceNameAuthException(this.remoteAddress, "GetTopicsOfNamespace", this.getPrincipal(), Optional.of(namespaceName), ex);
                String msg = "Exception occurred while trying to authorize GetTopicsOfNamespace";
                this.commandSender.sendErrorResponse(requestId, ServerError.AuthorizationError, "Exception occurred while trying to authorize GetTopicsOfNamespace");
                lookupSemaphore.release();
                return null;
            });
        } else {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed GetTopicsOfNamespace lookup due to too many lookup-requests {}", (Object)this.remoteAddress, (Object)namespaceName);
            }
            this.commandSender.sendErrorResponse(requestId, ServerError.TooManyRequests, "Failed due to too many pending lookup requests");
        }
    }

    @Override
    protected void handleGetSchema(CommandGetSchema commandGetSchema) {
        String schemaName;
        if (log.isDebugEnabled()) {
            if (commandGetSchema.hasSchemaVersion()) {
                log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}", new Object[]{this.remoteAddress, new String(commandGetSchema.getSchemaVersion()), commandGetSchema.getTopic(), commandGetSchema.getRequestId()});
            } else {
                log.debug("Received CommandGetSchema call from {}, schemaVersion: {}, topic: {}, requestId: {}", new Object[]{this.remoteAddress, null, commandGetSchema.getTopic(), commandGetSchema.getRequestId()});
            }
        }
        long requestId = commandGetSchema.getRequestId();
        SchemaVersion schemaVersion = SchemaVersion.Latest;
        if (commandGetSchema.hasSchemaVersion()) {
            schemaVersion = this.schemaService.versionFromBytes(commandGetSchema.getSchemaVersion());
        }
        try {
            schemaName = TopicName.get(commandGetSchema.getTopic()).getSchemaName();
        }
        catch (Throwable t) {
            this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.InvalidTopicName, t.getMessage());
            return;
        }
        ((CompletableFuture)this.schemaService.getSchema(schemaName, schemaVersion).thenAccept(schemaAndMetadata -> {
            if (schemaAndMetadata == null) {
                this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.TopicNotFound, "Topic not found or no-schema");
            } else {
                this.commandSender.sendGetSchemaResponse(requestId, SchemaInfoUtil.newSchemaInfo(schemaName, schemaAndMetadata.schema), schemaAndMetadata.version);
            }
        })).exceptionally(ex -> {
            this.commandSender.sendGetSchemaErrorResponse(requestId, ServerError.UnknownError, ex.getMessage());
            return null;
        });
    }

    @Override
    protected void handleGetOrCreateSchema(CommandGetOrCreateSchema commandGetOrCreateSchema) {
        if (log.isDebugEnabled()) {
            log.debug("Received CommandGetOrCreateSchema call from {}", (Object)this.remoteAddress);
        }
        long requestId = commandGetOrCreateSchema.getRequestId();
        String topicName = commandGetOrCreateSchema.getTopic();
        SchemaData schemaData = this.getSchema(commandGetOrCreateSchema.getSchema());
        SchemaData schema = schemaData.getType() == SchemaType.NONE ? null : schemaData;
        ((CompletableFuture)this.service.getTopicIfExists(topicName).thenAccept(topicOpt -> {
            if (topicOpt.isPresent()) {
                Topic topic = (Topic)topicOpt.get();
                CompletableFuture<SchemaVersion> schemaVersionFuture = this.tryAddSchema(topic, schema);
                ((CompletableFuture)schemaVersionFuture.exceptionally(ex -> {
                    ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
                    String message = ex.getMessage();
                    if (ex.getCause() != null) {
                        message = message + " caused by " + ex.getCause();
                    }
                    this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, message);
                    return null;
                })).thenAccept(schemaVersion -> this.commandSender.sendGetOrCreateSchemaResponse(requestId, (SchemaVersion)schemaVersion));
            } else {
                this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, ServerError.TopicNotFound, "Topic not found");
            }
        })).exceptionally(ex -> {
            ServerError errorCode = BrokerServiceException.getClientErrorCode(ex);
            this.commandSender.sendGetOrCreateSchemaErrorResponse(requestId, errorCode, ex.getMessage());
            return null;
        });
    }

    @Override
    protected void handleTcClientConnectRequest(CommandTcClientConnectRequest command) {
        long requestId = command.getRequestId();
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
        if (log.isDebugEnabled()) {
            log.debug("Receive tc client connect request {} to transaction meta store {} from {}.", new Object[]{requestId, tcId, this.remoteAddress});
        }
        if (!this.checkTransactionEnableAndSenError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        ((CompletableFuture)transactionMetadataStoreService.handleTcClientConnect(tcId).thenAccept(connection -> {
            if (log.isDebugEnabled()) {
                log.debug("Handle tc client connect request {} to transaction meta store {} from {} success.", new Object[]{requestId, tcId, this.remoteAddress});
            }
            this.commandSender.sendTcClientConnectResponse(requestId);
        })).exceptionally(e -> {
            log.error("Handle tc client connect request {} to transaction meta store {} from {} fail.", new Object[]{requestId, tcId, this.remoteAddress, e.getCause()});
            this.commandSender.sendTcClientConnectResponse(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage());
            return null;
        });
    }

    private boolean checkTransactionEnableAndSenError(long requestId) {
        if (!this.service.getPulsar().getConfig().isTransactionCoordinatorEnabled()) {
            BrokerServiceException.NotAllowedException ex = new BrokerServiceException.NotAllowedException("Transaction manager is not not enabled");
            this.commandSender.sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(ex), ex.getMessage());
            return false;
        }
        return true;
    }

    @Override
    protected void handleNewTxn(CommandNewTxn command) {
        long requestId = command.getRequestId();
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTcId());
        if (log.isDebugEnabled()) {
            log.debug("Receive new txn request {} to transaction meta store {} from {}.", new Object[]{requestId, tcId, this.remoteAddress});
        }
        if (!this.checkTransactionEnableAndSenError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        transactionMetadataStoreService.newTransaction(tcId, command.getTxnTtlSeconds()).whenComplete((txnID, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response {} for new txn request {}", (Object)tcId.getId(), (Object)requestId);
                }
                this.ctx.writeAndFlush(Commands.newTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Send response error for new txn request {}", (Object)requestId, ex);
                }
                this.ctx.writeAndFlush(Commands.newTxnResponse(requestId, tcId.getId(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    @Override
    protected void handleAddPartitionToTxn(CommandAddPartitionToTxn command) {
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());
        long requestId = command.getRequestId();
        if (log.isDebugEnabled()) {
            command.getPartitionsList().forEach(partion -> log.debug("Receive add published partition to txn request {} from {} with txnId {}, topic: [{}]", new Object[]{requestId, this.remoteAddress, txnID, partion}));
        }
        if (!this.checkTransactionEnableAndSenError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        this.service.pulsar().getTransactionMetadataStoreService().addProducedPartitionToTxn(txnID, command.getPartitionsList()).whenComplete((v, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response success for add published partition to txn request {}", (Object)requestId);
                }
                this.ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Send response error for add published partition to txn request {}", (Object)requestId, ex);
                }
                if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
                    this.ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                } else {
                    this.ctx.writeAndFlush(Commands.newAddPartitionToTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()), ex.getCause().getMessage()));
                }
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    @Override
    protected void handleEndTxn(CommandEndTxn command) {
        long requestId = command.getRequestId();
        int txnAction = command.getTxnAction().getValue();
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());
        if (!this.checkTransactionEnableAndSenError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        transactionMetadataStoreService.endTransaction(txnID, txnAction, false).whenComplete((v, ex) -> {
            if (ex == null) {
                this.ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
            } else {
                log.error("Send response error for end txn request.", ex);
                if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
                    this.ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                } else {
                    this.ctx.writeAndFlush(Commands.newEndTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()), ex.getCause().getMessage()));
                }
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    @Override
    protected void handleEndTxnOnPartition(CommandEndTxnOnPartition command) {
        long requestId = command.getRequestId();
        String topic = command.getTopic();
        int txnAction = command.getTxnAction().getValue();
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
        if (log.isDebugEnabled()) {
            log.debug("[{}] handleEndTxnOnPartition txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, txnAction});
        }
        CompletableFuture<Optional<Topic>> topicFuture = this.service.getTopicIfExists(TopicName.get(topic).toString());
        ((CompletableFuture)topicFuture.thenAccept(optionalTopic -> {
            if (optionalTopic.isPresent()) {
                ((Topic)optionalTopic.get()).endTxn(txnID, txnAction, lowWaterMark).whenComplete((ignored, throwable) -> {
                    if (throwable != null) {
                        log.error("handleEndTxnOnPartition fail!, topic {}, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(txnAction), throwable});
                        this.ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, BrokerServiceException.getClientErrorCode(throwable), throwable.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
                        return;
                    }
                    this.ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                });
            } else {
                ((CompletableFuture)this.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()).thenAccept(b -> {
                    if (b.booleanValue()) {
                        log.error("handleEndTxnOnPartition fail ! The topic {} does not exist in broker, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(txnAction)});
                        this.ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, ServerError.ServiceNotReady, "The topic " + topic + " does not exist in broker.", txnID.getMostSigBits(), txnID.getLeastSigBits()));
                    } else {
                        log.warn("handleEndTxnOnPartition fail ! The topic {} has not been created, txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(txnAction)});
                        this.ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                    }
                })).exceptionally(e -> {
                    log.error("handleEndTxnOnPartition fail ! topic {} , txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(txnAction), e.getCause()});
                    this.ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, ServerError.ServiceNotReady, e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
                    return null;
                });
            }
        })).exceptionally(e -> {
            log.error("handleEndTxnOnPartition fail ! topic {} , txnId: [{}], txnAction: [{}]", new Object[]{topic, txnID, TxnAction.valueOf(txnAction), e.getCause()});
            this.ctx.writeAndFlush(Commands.newEndTxnOnPartitionResponse(requestId, ServerError.ServiceNotReady, e.getMessage(), txnID.getLeastSigBits(), txnID.getMostSigBits()));
            return null;
        });
    }

    @Override
    protected void handleEndTxnOnSubscription(CommandEndTxnOnSubscription command) {
        long requestId = command.getRequestId();
        long txnidMostBits = command.getTxnidMostBits();
        long txnidLeastBits = command.getTxnidLeastBits();
        String topic = command.getSubscription().getTopic();
        String subName = command.getSubscription().getSubscription();
        int txnAction = command.getTxnAction().getValue();
        TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits);
        long lowWaterMark = command.getTxnidLeastBitsOfLowWatermark();
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] handleEndTxnOnSubscription txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, new TxnID(txnidMostBits, txnidLeastBits), txnAction});
        }
        CompletableFuture<Optional<Topic>> topicFuture = this.service.getTopicIfExists(TopicName.get(topic).toString());
        ((CompletableFuture)topicFuture.thenAccept(optionalTopic -> {
            if (optionalTopic.isPresent()) {
                Subscription subscription = ((Topic)optionalTopic.get()).getSubscription(subName);
                if (subscription == null) {
                    log.warn("handleEndTxnOnSubscription fail! topic {} subscription {} does not exist. txnId: [{}], txnAction: [{}]", new Object[]{((Topic)optionalTopic.get()).getName(), subName, txnID, TxnAction.valueOf(txnAction)});
                    this.ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
                    return;
                }
                CompletableFuture<Void> completableFuture = subscription.endTxn(txnidMostBits, txnidLeastBits, txnAction, lowWaterMark);
                completableFuture.whenComplete((ignored, e) -> {
                    if (e != null) {
                        log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf(txnAction), e.getCause()});
                        this.ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits, BrokerServiceException.getClientErrorCode(e), "Handle end txn on subscription failed."));
                        return;
                    }
                    this.ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits));
                });
            } else {
                ((CompletableFuture)this.getBrokerService().getManagedLedgerFactory().asyncExists(TopicName.get(topic).getPersistenceNamingEncoding()).thenAccept(b -> {
                    if (b.booleanValue()) {
                        log.error("handleEndTxnOnSubscription fail! The topic {} does not exist in broker, subscription: {} ,txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, new TxnID(txnidMostBits, txnidLeastBits), TxnAction.valueOf(txnAction)});
                        this.ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), ServerError.ServiceNotReady, "The topic " + topic + " does not exist in broker."));
                    } else {
                        log.warn("handleEndTxnOnSubscription fail ! The topic {} has not been created, subscription: {} txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf(txnAction)});
                        this.ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                    }
                })).exceptionally(e -> {
                    log.error("handleEndTxnOnSubscription fail ! topic {} , subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf(txnAction), e.getCause()});
                    this.ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits(), ServerError.ServiceNotReady, e.getMessage()));
                    return null;
                });
            }
        })).exceptionally(e -> {
            log.error("handleEndTxnOnSubscription fail ! topic: {} , subscription: {}txnId: [{}], txnAction: [{}]", new Object[]{topic, subName, txnID, TxnAction.valueOf(txnAction), e.getCause()});
            this.ctx.writeAndFlush(Commands.newEndTxnOnSubscriptionResponse(requestId, txnidLeastBits, txnidMostBits, ServerError.ServiceNotReady, "Handle end txn on subscription failed."));
            return null;
        });
    }

    private CompletableFuture<SchemaVersion> tryAddSchema(Topic topic, SchemaData schema) {
        if (schema != null) {
            return topic.addSchema(schema);
        }
        return topic.hasSchema().thenCompose(hasSchema -> {
            log.info("[{}] {} configured with schema {}", new Object[]{this.remoteAddress, topic.getName(), hasSchema});
            CompletableFuture<SchemaVersion> result = new CompletableFuture<SchemaVersion>();
            if (hasSchema.booleanValue() && (this.schemaValidationEnforced || topic.getSchemaValidationEnforced())) {
                result.completeExceptionally(new IncompatibleSchemaException("Producers cannot connect or send message without a schema to topics with a schema"));
            } else {
                result.complete(SchemaVersion.Empty);
            }
            return result;
        });
    }

    @Override
    protected void handleAddSubscriptionToTxn(CommandAddSubscriptionToTxn command) {
        TxnID txnID = new TxnID(command.getTxnidMostBits(), command.getTxnidLeastBits());
        long requestId = command.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("Receive add published partition to txn request {} from {} with txnId {}", new Object[]{requestId, this.remoteAddress, txnID});
        }
        TransactionCoordinatorID tcId = TransactionCoordinatorID.get(command.getTxnidMostBits());
        if (!this.checkTransactionEnableAndSenError(requestId)) {
            return;
        }
        TransactionMetadataStoreService transactionMetadataStoreService = this.service.pulsar().getTransactionMetadataStoreService();
        transactionMetadataStoreService.addAckedPartitionToTxn(txnID, MLTransactionMetadataStore.subscriptionToTxnSubscription(command.getSubscriptionsList())).whenComplete((v, ex) -> {
            if (ex == null) {
                if (log.isDebugEnabled()) {
                    log.debug("Send response success for add published partition to txn request {}", (Object)requestId);
                }
                this.ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getLeastSigBits(), txnID.getMostSigBits()));
                log.info("handle add partition to txn finish.");
            } else {
                if (log.isDebugEnabled()) {
                    log.debug("Send response error for add published partition to txn request {}", (Object)requestId, ex);
                }
                if (ex instanceof CoordinatorException.CoordinatorNotFoundException) {
                    this.ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex), ex.getMessage()));
                } else {
                    this.ctx.writeAndFlush(Commands.newAddSubscriptionToTxnResponse(requestId, txnID.getMostSigBits(), BrokerServiceException.getClientErrorCode(ex.getCause()), ex.getCause().getMessage()));
                }
                transactionMetadataStoreService.handleOpFail((Throwable)ex, tcId);
            }
        });
    }

    @Override
    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    @Override
    protected void interceptCommand(BaseCommand command) throws InterceptException {
        if (this.getBrokerService().getInterceptor() != null) {
            this.getBrokerService().getInterceptor().onPulsarCommand(command, this);
        }
    }

    @Override
    public void closeProducer(Producer producer) {
        this.safelyRemoveProducer(producer);
        if (this.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v5.getValue()) {
            this.ctx.writeAndFlush(Commands.newCloseProducer(producer.getProducerId(), -1L));
        } else {
            this.close();
        }
    }

    @Override
    public void closeConsumer(Consumer consumer) {
        this.safelyRemoveConsumer(consumer);
        if (this.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v5.getValue()) {
            this.ctx.writeAndFlush(Commands.newCloseConsumer(consumer.consumerId(), -1L));
        } else {
            this.close();
        }
    }

    protected void close() {
        this.ctx.close();
    }

    @Override
    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    @Override
    public void removedConsumer(Consumer consumer) {
        this.safelyRemoveConsumer(consumer);
    }

    @Override
    public void removedProducer(Producer producer) {
        this.safelyRemoveProducer(producer);
    }

    private void safelyRemoveProducer(Producer producer) {
        CompletableFuture<Producer> future;
        long producerId = producer.getProducerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: producerId={}, producer={}", new Object[]{this.remoteAddress, producerId, producer});
        }
        if ((future = this.producers.get(producerId)) != null) {
            future.whenComplete((producer2, exception) -> {
                if (exception != null || producer2 == producer) {
                    this.producers.remove(producerId, future);
                }
            });
        }
    }

    private void safelyRemoveConsumer(Consumer consumer) {
        CompletableFuture<Consumer> future;
        long consumerId = consumer.consumerId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: consumerId={}, consumer={}", new Object[]{this.remoteAddress, consumerId, consumer});
        }
        if ((future = this.consumers.get(consumerId)) != null) {
            future.whenComplete((consumer2, exception) -> {
                if (exception != null || consumer2 == consumer) {
                    this.consumers.remove(consumerId, future);
                }
            });
        }
    }

    @Override
    public boolean isActive() {
        return this.isActive;
    }

    @Override
    public boolean isWritable() {
        return this.ctx.channel().isWritable();
    }

    public void startSendOperation(Producer producer, int msgSize, int numMessages) {
        boolean isPublishRateExceeded = false;
        if (this.preciseTopicPublishRateLimitingEnable) {
            boolean isPreciseTopicPublishRateExceeded = producer.getTopic().isTopicPublishRateExceeded(numMessages, msgSize);
            if (isPreciseTopicPublishRateExceeded) {
                producer.getTopic().disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = producer.getTopic().isBrokerPublishRateExceeded();
        } else {
            boolean resourceGroupPublishRateExceeded;
            if (producer.getTopic().isResourceGroupRateLimitingEnabled() && (resourceGroupPublishRateExceeded = producer.getTopic().isResourceGroupPublishRateExceeded(numMessages, msgSize))) {
                producer.getTopic().disableCnxAutoRead();
                return;
            }
            isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
        }
        if (++this.pendingSendRequest == this.maxPendingSendRequests || isPublishRateExceeded) {
            this.ctx.channel().config().setAutoRead(false);
            this.autoReadDisabledRateLimiting = isPublishRateExceeded;
            throttledConnections.inc();
        }
        if (pendingBytesPerThread.get().addAndGet(msgSize) >= this.maxPendingBytesPerThread && !this.autoReadDisabledPublishBufferLimiting && this.maxPendingBytesPerThread > 0L) {
            MutableInt pausedConnections = new MutableInt();
            cnxsPerThread.get().forEach(cnx -> {
                if (cnx.hasProducers() && !cnx.autoReadDisabledPublishBufferLimiting) {
                    cnx.disableCnxAutoRead();
                    cnx.autoReadDisabledPublishBufferLimiting = true;
                    pausedConnections.increment();
                }
            });
            this.getBrokerService().pausedConnections(pausedConnections.intValue());
        }
    }

    @Override
    public void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
        if (pendingBytesPerThread.get().addAndGet(-msgSize) < this.resumeThresholdPendingBytesPerThread && this.autoReadDisabledPublishBufferLimiting) {
            MutableInt resumedConnections = new MutableInt();
            cnxsPerThread.get().forEach(cnx -> {
                if (cnx.autoReadDisabledPublishBufferLimiting) {
                    cnx.autoReadDisabledPublishBufferLimiting = false;
                    cnx.enableCnxAutoRead();
                    resumedConnections.increment();
                }
            });
            this.getBrokerService().resumedConnections(resumedConnections.intValue());
        }
        if (--this.pendingSendRequest == this.resumeReadsThreshold) {
            this.enableCnxAutoRead();
        }
        if (isNonPersistentTopic) {
            --this.nonPersistentPendingMessages;
        }
    }

    @Override
    public void enableCnxAutoRead() {
        if (!(this.ctx == null || this.ctx.channel().config().isAutoRead() || this.autoReadDisabledRateLimiting || this.autoReadDisabledPublishBufferLimiting)) {
            this.ctx.channel().config().setAutoRead(true);
            this.ctx.read();
            throttledConnections.dec();
        }
    }

    @Override
    public void disableCnxAutoRead() {
        if (this.ctx != null && this.ctx.channel().config().isAutoRead()) {
            this.ctx.channel().config().setAutoRead(false);
        }
    }

    @Override
    public void cancelPublishRateLimiting() {
        if (this.autoReadDisabledRateLimiting) {
            this.autoReadDisabledRateLimiting = false;
        }
    }

    @Override
    public void cancelPublishBufferLimiting() {
        if (this.autoReadDisabledPublishBufferLimiting) {
            this.autoReadDisabledPublishBufferLimiting = false;
            throttledConnectionsGlobal.dec();
        }
    }

    private <T> ServerError getErrorCode(CompletableFuture<T> future) {
        ServerError error;
        block2: {
            error = ServerError.UnknownError;
            try {
                future.getNow(null);
            }
            catch (Exception e) {
                if (!(e.getCause() instanceof BrokerServiceException)) break block2;
                error = BrokerServiceException.getClientErrorCode(e.getCause());
            }
        }
        return error;
    }

    private void disableTcpNoDelayIfNeeded(String topic, String producerName) {
        if (producerName != null && producerName.startsWith(this.replicatorPrefix)) {
            try {
                if (this.ctx.channel().config().getOption(ChannelOption.TCP_NODELAY).booleanValue()) {
                    this.ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, false);
                }
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", new Object[]{topic, producerName, this.ctx.channel()});
            }
        }
    }

    private TopicName validateTopicName(String topic, long requestId, Object requestCommand) {
        try {
            return TopicName.get(topic);
        }
        catch (Throwable t) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to parse topic name '{}'", new Object[]{this.remoteAddress, topic, t});
            }
            if (requestCommand instanceof CommandLookupTopic) {
                this.ctx.writeAndFlush(Commands.newLookupErrorResponse(ServerError.InvalidTopicName, "Invalid topic name: " + t.getMessage(), requestId));
            } else if (requestCommand instanceof CommandPartitionedTopicMetadata) {
                this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(ServerError.InvalidTopicName, "Invalid topic name: " + t.getMessage(), requestId));
            } else {
                this.ctx.writeAndFlush(Commands.newError(requestId, ServerError.InvalidTopicName, "Invalid topic name: " + t.getMessage()));
            }
            return null;
        }
    }

    public ByteBufPair newMessageAndIntercept(long consumerId, long ledgerId, long entryId, int partition, int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet, String topic) {
        BaseCommand command = Commands.newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount, ackSet);
        ByteBufPair res = Commands.serializeCommandMessageWithSize(command, metadataAndPayload);
        try {
            BrokerInterceptor brokerInterceptor = this.getBrokerService().getInterceptor();
            if (brokerInterceptor != null) {
                brokerInterceptor.onPulsarCommand(command, this);
            } else {
                log.debug("BrokerInterceptor is not set in newMessageAndIntercept");
            }
        }
        catch (Exception e) {
            log.error("Exception occur when intercept messages.", (Throwable)e);
        }
        return res;
    }

    public State getState() {
        return this.state;
    }

    public SocketAddress getRemoteAddress() {
        return this.remoteAddress;
    }

    @Override
    public BrokerService getBrokerService() {
        return this.service;
    }

    public String getRole() {
        return this.authRole;
    }

    @Override
    public Promise<Void> newPromise() {
        return this.ctx.newPromise();
    }

    @Override
    public HAProxyMessage getHAProxyMessage() {
        return this.proxyMessage;
    }

    @Override
    public boolean hasHAProxyMessage() {
        return this.proxyMessage != null;
    }

    boolean hasConsumer(long consumerId) {
        return this.consumers.containsKey(consumerId);
    }

    @Override
    public boolean isBatchMessageCompatibleVersion() {
        return this.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v4.getValue();
    }

    boolean supportsAuthenticationRefresh() {
        return this.features != null && this.features.isSupportsAuthRefresh();
    }

    boolean supportBrokerMetadata() {
        return this.features != null && this.features.isSupportsBrokerEntryMetadata();
    }

    @Override
    public String getClientVersion() {
        return this.clientVersion;
    }

    @VisibleForTesting
    void setAutoReadDisabledRateLimiting(boolean isLimiting) {
        this.autoReadDisabledRateLimiting = isLimiting;
    }

    @Override
    public boolean isPreciseDispatcherFlowControl() {
        return this.preciseDispatcherFlowControl;
    }

    public AuthenticationState getAuthState() {
        return this.authState;
    }

    @Override
    public AuthenticationDataSource getAuthenticationData() {
        return this.originalAuthData != null ? this.originalAuthData : this.authenticationData;
    }

    public String getPrincipal() {
        return this.originalPrincipal != null ? this.originalPrincipal : this.authRole;
    }

    public AuthenticationProvider getAuthenticationProvider() {
        return this.authenticationProvider;
    }

    @Override
    public String getAuthRole() {
        return this.authRole;
    }

    public String getAuthMethod() {
        return this.authMethod;
    }

    public ConcurrentLongHashMap<CompletableFuture<Consumer>> getConsumers() {
        return this.consumers;
    }

    public ConcurrentLongHashMap<CompletableFuture<Producer>> getProducers() {
        return this.producers;
    }

    @Override
    public PulsarCommandSender getCommandSender() {
        return this.commandSender;
    }

    @Override
    public void execute(Runnable runnable) {
        this.ctx().channel().eventLoop().execute(runnable);
    }

    @Override
    public String clientSourceAddress() {
        if (this.proxyMessage != null) {
            return this.proxyMessage.sourceAddress();
        }
        if (this.remoteAddress instanceof InetSocketAddress) {
            InetSocketAddress inetAddress = (InetSocketAddress)this.remoteAddress;
            return inetAddress.getAddress().getHostAddress();
        }
        return null;
    }

    private static void logAuthException(SocketAddress remoteAddress, String operation, String principal, Optional<TopicName> topic, Throwable ex) {
        String topicString = topic.map(t -> ", topic=" + t.toString()).orElse("");
        if (ex instanceof AuthenticationException) {
            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", new Object[]{remoteAddress, operation, principal, topicString, ex.getMessage()});
        } else {
            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", new Object[]{remoteAddress, operation, principal, topicString, ex});
        }
    }

    private static void logNamespaceNameAuthException(SocketAddress remoteAddress, String operation, String principal, Optional<NamespaceName> namespaceName, Throwable ex) {
        String namespaceNameString = namespaceName.map(t -> ", namespace=" + t.toString()).orElse("");
        if (ex instanceof AuthenticationException) {
            log.info("[{}] Failed to authenticate: operation={}, principal={}{}, reason={}", new Object[]{remoteAddress, operation, principal, namespaceNameString, ex.getMessage()});
        } else {
            log.error("[{}] Error trying to authenticate: operation={}, principal={}{}", new Object[]{remoteAddress, operation, principal, namespaceNameString, ex});
        }
    }

    public boolean hasProducers() {
        return !this.producers.isEmpty();
    }

    static enum State {
        Start,
        Connected,
        Failed,
        Connecting;

    }
}

