package org.apache.pulsar.broker.service;

import java.net.SocketAddress;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.naming.AuthenticationException;
import javax.net.ssl.SSLSession;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.impl.PersistentTopicsBase;
import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.broker.lookup.TopicLookupBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandler;
import org.apache.pulsar.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.pulsar.shade.io.netty.channel.ChannelOption;
import org.apache.pulsar.shade.io.netty.handler.ssl.SslHandler;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.CommandUtils;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.Commands;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarHandler;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.Metadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/ServerCnx.class */
public class ServerCnx extends PulsarHandler {
    private final BrokerService service;
    private final ConcurrentLongHashMap<CompletableFuture<Producer>> producers;
    private final ConcurrentLongHashMap<CompletableFuture<Consumer>> consumers;
    private State state;
    private volatile boolean isActive;
    String authRole;
    AuthenticationDataSource authenticationData;
    private static final int MaxPendingSendRequests = 1000;
    private static final int ResumeReadsThreshold = 500;
    private int pendingSendRequest;
    private final String replicatorPrefix;
    private String clientVersion;
    private int nonPersistentPendingMessages;
    private final int MaxNonPersistentPendingMessages;
    private String originalPrincipal;
    private Set<String> proxyRoles;
    private boolean authenticateOriginalAuthData;
    private static final Logger log = LoggerFactory.getLogger(ServerCnx.class);
    private static /* synthetic */ int[] $SWITCH_TABLE$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type;

    /* renamed from: org.apache.pulsar.broker.service.ServerCnx$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/pulsar/broker/service/ServerCnx$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type = new int[PulsarApi.Schema.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type[PulsarApi.Schema.Type.None.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type[PulsarApi.Schema.Type.String.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type[PulsarApi.Schema.Type.Json.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type[PulsarApi.Schema.Type.Protobuf.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type[PulsarApi.Schema.Type.Avro.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    /* loaded from: input_file:org/apache/pulsar/broker/service/ServerCnx$State.class */
    enum State {
        Start,
        Connected,
        Failed;

        /* renamed from: values, reason: to resolve conflict with enum method */
        public static State[] valuesCustom() {
            State[] valuesCustom = values();
            int length = valuesCustom.length;
            State[] stateArr = new State[length];
            System.arraycopy(valuesCustom, 0, stateArr, 0, length);
            return stateArr;
        }
    }

    public ServerCnx(BrokerService brokerService) {
        super(brokerService.getKeepAliveIntervalSeconds(), TimeUnit.SECONDS);
        this.isActive = true;
        this.authRole = null;
        this.pendingSendRequest = 0;
        this.clientVersion = null;
        this.nonPersistentPendingMessages = 0;
        this.originalPrincipal = null;
        this.service = brokerService;
        this.state = State.Start;
        this.producers = new ConcurrentLongHashMap<>(8, 1);
        this.consumers = new ConcurrentLongHashMap<>(8, 1);
        this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
        this.MaxNonPersistentPendingMessages = brokerService.pulsar().getConfiguration().getMaxConcurrentNonPersistentMessagePerConnection();
        this.proxyRoles = brokerService.pulsar().getConfiguration().getProxyRoles();
        this.authenticateOriginalAuthData = brokerService.pulsar().getConfiguration().authenticateOriginalAuthData();
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarHandler, org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        log.info("New connection from {}", this.remoteAddress);
        this.ctx = channelHandlerContext;
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarHandler, org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        this.isActive = false;
        log.info("Closed connection from {}", this.remoteAddress);
        this.producers.values().forEach(completableFuture -> {
            if (!completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
                return;
            }
            ((Producer) completableFuture.getNow(null)).closeNow();
        });
        this.consumers.values().forEach(completableFuture2 -> {
            if (!completableFuture2.isDone() || completableFuture2.isCompletedExceptionally()) {
                return;
            }
            Consumer consumer = (Consumer) completableFuture2.getNow(null);
            try {
                consumer.close();
            } catch (BrokerServiceException e) {
                log.warn("Consumer {} was already closed: {}", new Object[]{consumer, e.getMessage(), e});
            }
        });
    }

    @Override // org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandler
    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
        if (log.isDebugEnabled()) {
            log.debug("Channel writability has changed to: {}", Boolean.valueOf(channelHandlerContext.channel().isWritable()));
        }
    }

    @Override // org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.pulsar.shade.io.netty.channel.ChannelHandlerAdapter, org.apache.pulsar.shade.io.netty.channel.ChannelHandler, org.apache.pulsar.shade.io.netty.channel.ChannelInboundHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
        if (this.state != State.Failed) {
            Logger logger = log;
            Object[] objArr = new Object[4];
            objArr[0] = this.remoteAddress;
            objArr[1] = th.getClass().getSimpleName();
            objArr[2] = th.getMessage();
            objArr[3] = ClientCnx.isKnownException(th) ? null : th;
            logger.warn("[{}] Got exception {} : {}", objArr);
            this.state = State.Failed;
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Got exception: {}", new Object[]{this.remoteAddress, th.getMessage(), th});
        }
        channelHandlerContext.close();
    }

    private boolean invalidOriginalPrincipal(String str) {
        if (this.service.isAuthenticationEnabled() && this.service.isAuthorizationEnabled() && this.proxyRoles.contains(this.authRole)) {
            return StringUtils.isBlank(str) || this.proxyRoles.contains(str);
        }
        return false;
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleLookup(PulsarApi.CommandLookupTopic commandLookupTopic) {
        long requestId = commandLookupTopic.getRequestId();
        boolean authoritative = commandLookupTopic.getAuthoritative();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Lookup from {} for {}", new Object[]{commandLookupTopic.getTopic(), this.remoteAddress, Long.valueOf(requestId)});
        }
        TopicName validateTopicName = validateTopicName(commandLookupTopic.getTopic(), requestId, commandLookupTopic);
        if (validateTopicName == null) {
            return;
        }
        Semaphore lookupRequestSemaphore = this.service.getLookupRequestSemaphore();
        if (!lookupRequestSemaphore.tryAcquire()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed lookup due to too many lookup-requests {}", this.remoteAddress, validateTopicName);
            }
            this.ctx.writeAndFlush(Commands.newLookupErrorResponse(PulsarApi.ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId));
        } else if (invalidOriginalPrincipal(this.originalPrincipal)) {
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for lookup ", this.authRole, this.originalPrincipal, validateTopicName});
            this.ctx.writeAndFlush(Commands.newLookupErrorResponse(PulsarApi.ServerError.AuthorizationError, "Valid Proxy Client role should be provided for lookup ", requestId));
            lookupRequestSemaphore.release();
        } else {
            CompletableFuture<Boolean> completedFuture = (!this.service.isAuthorizationEnabled() || this.originalPrincipal == null) ? CompletableFuture.completedFuture(true) : this.service.getAuthorizationService().canLookupAsync(validateTopicName, this.authRole, this.authenticationData);
            String str = this.originalPrincipal;
            completedFuture.thenApply((Function<? super Boolean, ? extends U>) bool -> {
                if (bool.booleanValue()) {
                    TopicLookupBase.lookupTopicAsync(getBrokerService().pulsar(), validateTopicName, authoritative, str != null ? str : this.authRole, this.authenticationData, requestId).handle((byteBuf, th) -> {
                        if (th == null) {
                            this.ctx.writeAndFlush(byteBuf);
                        } else {
                            log.warn("[{}] lookup failed with error {}, {}", new Object[]{this.remoteAddress, validateTopicName, th.getMessage(), th});
                            this.ctx.writeAndFlush(Commands.newLookupErrorResponse(PulsarApi.ServerError.ServiceNotReady, th.getMessage(), requestId));
                        }
                        lookupRequestSemaphore.release();
                        return null;
                    });
                    return null;
                }
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Lookup", this.authRole, validateTopicName});
                this.ctx.writeAndFlush(Commands.newLookupErrorResponse(PulsarApi.ServerError.AuthorizationError, "Proxy Client is not authorized to Lookup", requestId));
                lookupRequestSemaphore.release();
                return null;
            }).exceptionally(th -> {
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Exception occured while trying to authorize lookup", this.authRole, validateTopicName, th});
                this.ctx.writeAndFlush(Commands.newLookupErrorResponse(PulsarApi.ServerError.AuthorizationError, "Exception occured while trying to authorize lookup", requestId));
                lookupRequestSemaphore.release();
                return null;
            });
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handlePartitionMetadataRequest(PulsarApi.CommandPartitionedTopicMetadata commandPartitionedTopicMetadata) {
        long requestId = commandPartitionedTopicMetadata.getRequestId();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received PartitionMetadataLookup from {} for {}", new Object[]{commandPartitionedTopicMetadata.getTopic(), this.remoteAddress, Long.valueOf(requestId)});
        }
        TopicName validateTopicName = validateTopicName(commandPartitionedTopicMetadata.getTopic(), requestId, commandPartitionedTopicMetadata);
        if (validateTopicName == null) {
            return;
        }
        Semaphore lookupRequestSemaphore = this.service.getLookupRequestSemaphore();
        if (!lookupRequestSemaphore.tryAcquire()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed Partition-Metadata lookup due to too many lookup-requests {}", this.remoteAddress, validateTopicName);
            }
            this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(PulsarApi.ServerError.TooManyRequests, "Failed due to too many pending lookup requests", requestId));
        } else if (invalidOriginalPrincipal(this.originalPrincipal)) {
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", this.authRole, this.originalPrincipal, validateTopicName});
            this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(PulsarApi.ServerError.AuthorizationError, "Valid Proxy Client role should be provided for getPartitionMetadataRequest ", requestId));
            lookupRequestSemaphore.release();
        } else {
            CompletableFuture<Boolean> completedFuture = (!this.service.isAuthorizationEnabled() || this.originalPrincipal == null) ? CompletableFuture.completedFuture(true) : this.service.getAuthorizationService().canLookupAsync(validateTopicName, this.authRole, this.authenticationData);
            String str = this.originalPrincipal;
            completedFuture.thenApply((Function<? super Boolean, ? extends U>) bool -> {
                if (bool.booleanValue()) {
                    PersistentTopicsBase.getPartitionedTopicMetadata(getBrokerService().pulsar(), this.authRole, str, this.authenticationData, validateTopicName).handle((partitionedTopicMetadata, th) -> {
                        if (th == null) {
                            this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(partitionedTopicMetadata.partitions, requestId));
                        } else if (th instanceof PulsarClientException) {
                            log.warn("Failed to authorize {} at [{}] on topic {} : {}", new Object[]{getRole(), this.remoteAddress, validateTopicName, th.getMessage()});
                            this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(PulsarApi.ServerError.AuthorizationError, th.getMessage(), requestId));
                        } else {
                            log.warn("Failed to get Partitioned Metadata [{}] {}: {}", new Object[]{this.remoteAddress, validateTopicName, th.getMessage(), th});
                            this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse((!(th instanceof RestException) || ((RestException) th).getResponse().getStatus() >= 500) ? PulsarApi.ServerError.ServiceNotReady : PulsarApi.ServerError.MetadataError, th.getMessage(), requestId));
                        }
                        lookupRequestSemaphore.release();
                        return null;
                    });
                    return null;
                }
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Get Partition Metadata", this.authRole, validateTopicName});
                this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(PulsarApi.ServerError.AuthorizationError, "Proxy Client is not authorized to Get Partition Metadata", requestId));
                lookupRequestSemaphore.release();
                return null;
            }).exceptionally(th -> {
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Exception occured while trying to authorize get Partition Metadata", this.authRole, validateTopicName});
                this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(PulsarApi.ServerError.AuthorizationError, "Exception occured while trying to authorize get Partition Metadata", requestId));
                lookupRequestSemaphore.release();
                return null;
            });
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleConsumerStats(PulsarApi.CommandConsumerStats commandConsumerStats) {
        ByteBuf newConsumerStatsResponse;
        if (log.isDebugEnabled()) {
            log.debug("Received CommandConsumerStats call from {}", this.remoteAddress);
        }
        long requestId = commandConsumerStats.getRequestId();
        long consumerId = commandConsumerStats.getConsumerId();
        Consumer now = this.consumers.get(consumerId).getNow(null);
        if (now == null) {
            log.error("Failed to get consumer-stats response - Consumer not found for CommandConsumerStats[remoteAddress = {}, requestId = {}, consumerId = {}]", new Object[]{this.remoteAddress, Long.valueOf(requestId), Long.valueOf(consumerId)});
            newConsumerStatsResponse = Commands.newConsumerStatsResponse(PulsarApi.ServerError.ConsumerNotFound, "Consumer " + consumerId + " not found", requestId);
        } else {
            if (log.isDebugEnabled()) {
                log.debug("CommandConsumerStats[requestId = {}, consumer = {}]", Long.valueOf(requestId), now);
            }
            newConsumerStatsResponse = Commands.newConsumerStatsResponse(createConsumerStatsResponse(now, requestId));
        }
        this.ctx.writeAndFlush(newConsumerStatsResponse);
    }

    PulsarApi.CommandConsumerStatsResponse.Builder createConsumerStatsResponse(Consumer consumer, long j) {
        PulsarApi.CommandConsumerStatsResponse.Builder newBuilder = PulsarApi.CommandConsumerStatsResponse.newBuilder();
        ConsumerStats stats = consumer.getStats();
        newBuilder.setRequestId(j);
        newBuilder.setMsgRateOut(stats.msgRateOut);
        newBuilder.setMsgThroughputOut(stats.msgThroughputOut);
        newBuilder.setMsgRateRedeliver(stats.msgRateRedeliver);
        newBuilder.setConsumerName(stats.consumerName);
        newBuilder.setAvailablePermits(stats.availablePermits);
        newBuilder.setUnackedMessages(stats.unackedMessages);
        newBuilder.setBlockedConsumerOnUnackedMsgs(stats.blockedConsumerOnUnackedMsgs);
        newBuilder.setAddress(stats.getAddress());
        newBuilder.setConnectedSince(stats.getConnectedSince());
        Subscription subscription = consumer.getSubscription();
        newBuilder.setMsgBacklog(subscription.getNumberOfEntriesInBacklog());
        newBuilder.setMsgRateExpired(subscription.getExpiredMessageRate());
        newBuilder.setType(subscription.getTypeString());
        return newBuilder;
    }

    private String getOriginalPrincipal(String str, String str2, String str3, SSLSession sSLSession) throws AuthenticationException {
        if (this.authenticateOriginalAuthData) {
            str3 = str != null ? getBrokerService().getAuthenticationService().authenticate(new AuthenticationDataCommand(str, this.remoteAddress, sSLSession), str2) : null;
        }
        return str3;
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleConnect(PulsarApi.CommandConnect commandConnect) {
        Preconditions.checkArgument(this.state == State.Start);
        if (this.service.isAuthenticationEnabled()) {
            try {
                String str = "none";
                if (commandConnect.hasAuthMethodName()) {
                    str = commandConnect.getAuthMethodName();
                } else if (commandConnect.hasAuthMethod()) {
                    str = commandConnect.getAuthMethod().name().substring(10).toLowerCase();
                }
                String stringUtf8 = commandConnect.getAuthData().toStringUtf8();
                ChannelHandler channelHandler = this.ctx.channel().pipeline().get("tls");
                SSLSession sSLSession = null;
                if (channelHandler != null) {
                    sSLSession = ((SslHandler) channelHandler).engine().getSession();
                }
                this.originalPrincipal = getOriginalPrincipal(commandConnect.hasOriginalAuthData() ? commandConnect.getOriginalAuthData() : null, commandConnect.hasOriginalAuthMethod() ? commandConnect.getOriginalAuthMethod() : null, commandConnect.hasOriginalPrincipal() ? commandConnect.getOriginalPrincipal() : null, sSLSession);
                this.authenticationData = new AuthenticationDataCommand(stringUtf8, this.remoteAddress, sSLSession);
                this.authRole = getBrokerService().getAuthenticationService().authenticate(this.authenticationData, str);
                log.info("[{}] Client successfully authenticated with {} role {} and originalPrincipal {}", new Object[]{this.remoteAddress, str, this.authRole, this.originalPrincipal});
            } catch (AuthenticationException e) {
                log.warn("[{}] {}: {}", new Object[]{this.remoteAddress, "Unable to authenticate", e.getMessage()});
                this.ctx.writeAndFlush(Commands.newError(-1L, PulsarApi.ServerError.AuthenticationError, "Unable to authenticate"));
                close();
                return;
            }
        }
        if (log.isDebugEnabled()) {
            log.debug("Received CONNECT from {}", this.remoteAddress);
        }
        this.ctx.writeAndFlush(Commands.newConnected(commandConnect.getProtocolVersion()));
        this.state = State.Connected;
        this.remoteEndpointProtocolVersion = commandConnect.getProtocolVersion();
        String clientVersion = commandConnect.hasClientVersion() ? commandConnect.getClientVersion() : null;
        if (!StringUtils.isNotBlank(clientVersion) || clientVersion.contains(" ")) {
            return;
        }
        this.clientVersion = clientVersion.intern();
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleSubscribe(PulsarApi.CommandSubscribe commandSubscribe) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandSubscribe.getRequestId();
        long consumerId = commandSubscribe.getConsumerId();
        TopicName validateTopicName = validateTopicName(commandSubscribe.getTopic(), requestId, commandSubscribe);
        if (validateTopicName == null) {
            return;
        }
        if (invalidOriginalPrincipal(this.originalPrincipal)) {
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while subscribing ", this.authRole, this.originalPrincipal, validateTopicName});
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, "Valid Proxy Client role should be provided while subscribing "));
            return;
        }
        String subscription = commandSubscribe.getSubscription();
        PulsarApi.CommandSubscribe.SubType subType = commandSubscribe.getSubType();
        String consumerName = commandSubscribe.getConsumerName();
        boolean durable = commandSubscribe.getDurable();
        BatchMessageIdImpl batchMessageIdImpl = commandSubscribe.hasStartMessageId() ? new BatchMessageIdImpl(commandSubscribe.getStartMessageId().getLedgerId(), commandSubscribe.getStartMessageId().getEntryId(), commandSubscribe.getStartMessageId().getPartition(), commandSubscribe.getStartMessageId().getBatchIndex()) : null;
        String subscription2 = commandSubscribe.getSubscription();
        int priorityLevel = commandSubscribe.hasPriorityLevel() ? commandSubscribe.getPriorityLevel() : 0;
        boolean readCompacted = commandSubscribe.getReadCompacted();
        Map<String, String> metadataFromCommand = CommandUtils.metadataFromCommand(commandSubscribe);
        PulsarApi.CommandSubscribe.InitialPosition initialPosition = commandSubscribe.getInitialPosition();
        SchemaData schema = commandSubscribe.hasSchema() ? getSchema(commandSubscribe.getSchema()) : null;
        ((!this.service.isAuthorizationEnabled() || this.originalPrincipal == null) ? CompletableFuture.completedFuture(true) : this.service.getAuthorizationService().canConsumeAsync(validateTopicName, this.authRole, this.authenticationData, commandSubscribe.getSubscription())).thenApply((Function<? super Boolean, ? extends U>) bool -> {
            CompletableFuture<Boolean> completedFuture;
            if (!bool.booleanValue()) {
                log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to subscribe", this.authRole, validateTopicName});
                this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, "Proxy Client is not authorized to subscribe"));
                return null;
            }
            if (this.service.isAuthorizationEnabled()) {
                completedFuture = this.service.getAuthorizationService().canConsumeAsync(validateTopicName, this.originalPrincipal != null ? this.originalPrincipal : this.authRole, this.authenticationData, subscription2);
            } else {
                completedFuture = CompletableFuture.completedFuture(true);
            }
            completedFuture.thenApply((Function<? super Boolean, ? extends U>) bool -> {
                if (!bool.booleanValue()) {
                    log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, "Client is not authorized to subscribe", this.authRole});
                    this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, "Client is not authorized to subscribe"));
                    return null;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Client is authorized to subscribe with role {}", this.remoteAddress, this.authRole);
                }
                log.info("[{}] Subscribing on topic {} / {}", new Object[]{this.remoteAddress, validateTopicName, subscription});
                try {
                    Metadata.validateMetadata(metadataFromCommand);
                    CompletableFuture<Consumer> completableFuture = new CompletableFuture<>();
                    CompletableFuture<Consumer> putIfAbsent = this.consumers.putIfAbsent(consumerId, completableFuture);
                    if (putIfAbsent == null) {
                        this.service.getOrCreateTopic(validateTopicName.toString(), schema).thenCompose(topic -> {
                            return schema != null ? topic.isSchemaCompatible(schema).thenCompose(bool -> {
                                return bool.booleanValue() ? topic.subscribe(this, subscription, consumerId, subType, priorityLevel, consumerName, durable, batchMessageIdImpl, metadataFromCommand, readCompacted, initialPosition) : FutureUtil.failedFuture(new BrokerServiceException("Trying to subscribe with incompatible schema"));
                            }) : topic.subscribe(this, subscription, consumerId, subType, priorityLevel, consumerName, durable, batchMessageIdImpl, metadataFromCommand, readCompacted, initialPosition);
                        }).thenAccept((java.util.function.Consumer<? super U>) consumer -> {
                            if (completableFuture.complete(consumer)) {
                                log.info("[{}] Created subscription on topic {} / {}", new Object[]{this.remoteAddress, validateTopicName, subscription});
                                this.ctx.writeAndFlush(Commands.newSuccess(requestId), this.ctx.voidPromise());
                                return;
                            }
                            try {
                                consumer.close();
                                log.info("[{}] Cleared consumer created after timeout on client side {}", this.remoteAddress, consumer);
                            } catch (BrokerServiceException e) {
                                log.warn("[{}] Error closing consumer created after timeout on client side {}: {}", new Object[]{this.remoteAddress, consumer, e.getMessage()});
                            }
                            this.consumers.remove(consumerId, completableFuture);
                        }).exceptionally(th -> {
                            if (!(th.getCause() instanceof BrokerServiceException.ConsumerBusyException)) {
                                log.warn("[{}][{}][{}] Failed to create consumer: {}", new Object[]{this.remoteAddress, validateTopicName, subscription, th.getCause().getMessage(), th});
                            } else if (log.isDebugEnabled()) {
                                log.debug("[{}][{}][{}] Failed to create consumer because exclusive consumer is already connected: {}", new Object[]{this.remoteAddress, validateTopicName, subscription, th.getCause().getMessage()});
                            }
                            if (completableFuture.completeExceptionally(th)) {
                                this.ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(th.getCause()), th.getCause().getMessage()));
                            }
                            this.consumers.remove(consumerId, completableFuture);
                            return null;
                        });
                        return null;
                    }
                    if (!putIfAbsent.isDone() || putIfAbsent.isCompletedExceptionally()) {
                        log.warn("[{}][{}][{}] Consumer is already present on the connection", new Object[]{this.remoteAddress, validateTopicName, subscription});
                        this.ctx.writeAndFlush(Commands.newError(requestId, !putIfAbsent.isDone() ? PulsarApi.ServerError.ServiceNotReady : getErrorCode(putIfAbsent), "Consumer is already present on the connection"));
                        return null;
                    }
                    log.info("[{}] Consumer with the same id is already created: {}", this.remoteAddress, putIfAbsent.getNow(null));
                    this.ctx.writeAndFlush(Commands.newSuccess(requestId));
                    return null;
                } catch (IllegalArgumentException e) {
                    this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.MetadataError, e.getMessage()));
                    return null;
                }
            }).exceptionally(th -> {
                log.warn(String.format("[%s] %s with role %s", this.remoteAddress, th.getMessage(), this.authRole));
                this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, th.getMessage()));
                return null;
            });
            return null;
        }).exceptionally(th -> {
            String format = String.format("[%s] %s with role %s", this.remoteAddress, th.getMessage(), this.authRole);
            if (th.getCause() instanceof PulsarServerException) {
                log.info(format);
            } else {
                log.warn(format);
            }
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, th.getMessage()));
            return null;
        });
    }

    private static SchemaType getType(PulsarApi.Schema.Type type) {
        switch ($SWITCH_TABLE$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type()[type.ordinal()]) {
            case 1:
                return SchemaType.NONE;
            case 2:
                return SchemaType.STRING;
            case 3:
                return SchemaType.JSON;
            case 4:
                return SchemaType.PROTOBUF;
            case 5:
                return SchemaType.AVRO;
            default:
                return SchemaType.NONE;
        }
    }

    private SchemaData getSchema(PulsarApi.Schema schema) {
        return SchemaData.builder().data(schema.getSchemaData().toByteArray()).isDeleted(false).timestamp(System.currentTimeMillis()).user(Strings.nullToEmpty(this.originalPrincipal)).type(getType(schema.getType())).props((Map) schema.getPropertiesList().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).build();
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleProducer(PulsarApi.CommandProducer commandProducer) {
        Preconditions.checkArgument(this.state == State.Connected);
        long producerId = commandProducer.getProducerId();
        long requestId = commandProducer.getRequestId();
        String producerName = commandProducer.hasProducerName() ? commandProducer.getProducerName() : this.service.generateUniqueProducerName();
        boolean encrypted = commandProducer.getEncrypted();
        Map<String, String> metadataFromCommand = CommandUtils.metadataFromCommand(commandProducer);
        SchemaData schema = commandProducer.hasSchema() ? getSchema(commandProducer.getSchema()) : null;
        TopicName validateTopicName = validateTopicName(commandProducer.getTopic(), requestId, commandProducer);
        if (validateTopicName == null) {
            return;
        }
        if (!invalidOriginalPrincipal(this.originalPrincipal)) {
            ((!this.service.isAuthorizationEnabled() || this.originalPrincipal == null) ? CompletableFuture.completedFuture(true) : this.service.getAuthorizationService().canProduceAsync(validateTopicName, this.authRole, this.authenticationData)).thenApply((Function<? super Boolean, ? extends U>) bool -> {
                CompletableFuture<Boolean> completedFuture;
                if (!bool.booleanValue()) {
                    log.warn("[{}] {} with role {} on topic {}", new Object[]{this.remoteAddress, "Proxy Client is not authorized to Produce", this.authRole, validateTopicName});
                    this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, "Proxy Client is not authorized to Produce"));
                    return null;
                }
                if (this.service.isAuthorizationEnabled()) {
                    completedFuture = this.service.getAuthorizationService().canProduceAsync(validateTopicName, this.originalPrincipal != null ? this.originalPrincipal : this.authRole, this.authenticationData);
                } else {
                    completedFuture = CompletableFuture.completedFuture(true);
                }
                completedFuture.thenApply((Function<? super Boolean, ? extends U>) bool -> {
                    if (!bool.booleanValue()) {
                        log.warn("[{}] {} with role {}", new Object[]{this.remoteAddress, "Client is not authorized to Produce", this.authRole});
                        this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, "Client is not authorized to Produce"));
                        return null;
                    }
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Client is authorized to Produce with role {}", this.remoteAddress, this.authRole);
                    }
                    CompletableFuture<Producer> completableFuture = new CompletableFuture<>();
                    CompletableFuture<Producer> putIfAbsent = this.producers.putIfAbsent(producerId, completableFuture);
                    if (putIfAbsent == null) {
                        log.info("[{}][{}] Creating producer. producerId={}", new Object[]{this.remoteAddress, validateTopicName, Long.valueOf(producerId)});
                        this.service.getOrCreateTopic(validateTopicName.toString(), schema).thenAccept(topic -> {
                            if (topic.isBacklogQuotaExceeded(producerName)) {
                                IllegalStateException illegalStateException = new IllegalStateException("Cannot create producer on topic with backlog quota exceeded");
                                BacklogQuota.RetentionPolicy policy = topic.getBacklogQuota().getPolicy();
                                if (policy == BacklogQuota.RetentionPolicy.producer_request_hold) {
                                    this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.ProducerBlockedQuotaExceededError, illegalStateException.getMessage()));
                                } else if (policy == BacklogQuota.RetentionPolicy.producer_exception) {
                                    this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.ProducerBlockedQuotaExceededException, illegalStateException.getMessage()));
                                }
                                completableFuture.completeExceptionally(illegalStateException);
                                this.producers.remove(producerId, completableFuture);
                                return;
                            }
                            if (topic.isEncryptionRequired() && !encrypted) {
                                String format = String.format("Encryption is required in %s", validateTopicName);
                                log.warn("[{}] {}", this.remoteAddress, format);
                                this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.MetadataError, format));
                            } else {
                                disableTcpNoDelayIfNeeded(validateTopicName.toString(), producerName);
                                CompletableFuture<SchemaVersion> addSchema = schema != null ? topic.addSchema(schema) : CompletableFuture.completedFuture(SchemaVersion.Empty);
                                addSchema.exceptionally(th -> {
                                    this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.UnknownError, th.getMessage()));
                                    this.producers.remove(producerId, completableFuture);
                                    return null;
                                });
                                addSchema.thenAccept(schemaVersion -> {
                                    Producer producer = new Producer(topic, this, producerId, producerName, this.authRole, encrypted, metadataFromCommand, schemaVersion);
                                    try {
                                        topic.addProducer(producer);
                                        if (!isActive()) {
                                            producer.closeNow();
                                            log.info("[{}] Cleared producer created after connection was closed: {}", this.remoteAddress, producer);
                                            completableFuture.completeExceptionally(new IllegalStateException("Producer created after connection was closed"));
                                        } else if (completableFuture.complete(producer)) {
                                            log.info("[{}] Created new producer: {}", this.remoteAddress, producer);
                                            this.ctx.writeAndFlush(Commands.newProducerSuccess(requestId, producerName, producer.getLastSequenceId(), producer.getSchemaVersion()));
                                            return;
                                        } else {
                                            producer.closeNow();
                                            log.info("[{}] Cleared producer created after timeout on client side {}", this.remoteAddress, producer);
                                        }
                                    } catch (BrokerServiceException e) {
                                        log.error("[{}] Failed to add producer to topic {}: {}", new Object[]{this.remoteAddress, validateTopicName, e.getMessage()});
                                        this.ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage()));
                                        completableFuture.completeExceptionally(e);
                                    }
                                    this.producers.remove(producerId, completableFuture);
                                });
                            }
                        }).exceptionally(th -> {
                            Throwable cause = th.getCause();
                            if (!(cause instanceof BrokerServiceException.ServiceUnitNotReadyException)) {
                                log.error("[{}] Failed to create topic {}", new Object[]{this.remoteAddress, validateTopicName, th});
                            }
                            if (completableFuture.completeExceptionally(th)) {
                                this.ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(cause), cause.getMessage()));
                            }
                            this.producers.remove(producerId, completableFuture);
                            return null;
                        });
                        return null;
                    }
                    if (!putIfAbsent.isDone() || putIfAbsent.isCompletedExceptionally()) {
                        PulsarApi.ServerError errorCode = !putIfAbsent.isDone() ? PulsarApi.ServerError.ServiceNotReady : getErrorCode(putIfAbsent);
                        log.warn("[{}][{}] Producer is already present on the connection", this.remoteAddress, validateTopicName);
                        this.ctx.writeAndFlush(Commands.newError(requestId, errorCode, "Producer is already present on the connection"));
                        return null;
                    }
                    Producer now = putIfAbsent.getNow(null);
                    log.info("[{}] Producer with the same id is already created: {}", this.remoteAddress, now);
                    this.ctx.writeAndFlush(Commands.newProducerSuccess(requestId, now.getProducerName(), now.getSchemaVersion()));
                    return null;
                }).exceptionally(th -> {
                    log.warn(String.format("[%s] %s with role %s", this.remoteAddress, th.getMessage(), this.authRole));
                    this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, th.getMessage()));
                    return null;
                });
                return null;
            }).exceptionally(th -> {
                log.warn(String.format("[%s] %s with role %s", this.remoteAddress, th.getMessage(), this.authRole));
                this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, th.getMessage()));
                return null;
            });
        } else {
            log.warn("[{}] {} with role {} and proxyClientAuthRole {} on topic {}", new Object[]{this.remoteAddress, "Valid Proxy Client role should be provided while creating producer ", this.authRole, this.originalPrincipal, validateTopicName});
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.AuthorizationError, "Valid Proxy Client role should be provided while creating producer "));
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleSend(PulsarApi.CommandSend commandSend, ByteBuf byteBuf) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Producer> completableFuture = this.producers.get(commandSend.getProducerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            log.warn("[{}] Producer had already been closed: {}", this.remoteAddress, Long.valueOf(commandSend.getProducerId()));
            return;
        }
        Producer now = completableFuture.getNow(null);
        if (log.isDebugEnabled()) {
            printSendCommandDebug(commandSend, byteBuf);
        }
        if (now.isNonPersistentTopic()) {
            if (this.nonPersistentPendingMessages > this.MaxNonPersistentPendingMessages) {
                long producerId = commandSend.getProducerId();
                long sequenceId = commandSend.getSequenceId();
                this.service.getTopicOrderedExecutor().executeOrdered(now.getTopic().getName(), SafeRun.safeRun(() -> {
                    this.ctx.writeAndFlush(Commands.newSendReceipt(producerId, sequenceId, -1L, -1L), this.ctx.voidPromise());
                }));
                now.recordMessageDrop(commandSend.getNumMessages());
                return;
            }
            this.nonPersistentPendingMessages++;
        }
        startSendOperation();
        now.publishMessage(commandSend.getProducerId(), commandSend.getSequenceId(), byteBuf, commandSend.getNumMessages());
    }

    private void printSendCommandDebug(PulsarApi.CommandSend commandSend, ByteBuf byteBuf) {
        byteBuf.markReaderIndex();
        PulsarApi.MessageMetadata parseMessageMetadata = Commands.parseMessageMetadata(byteBuf);
        byteBuf.resetReaderIndex();
        log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {}", new Object[]{this.remoteAddress, Long.valueOf(commandSend.getProducerId()), Long.valueOf(commandSend.getSequenceId()), parseMessageMetadata.getProducerName(), Long.valueOf(parseMessageMetadata.getSequenceId()), Integer.valueOf(byteBuf.readableBytes())});
        parseMessageMetadata.recycle();
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleAck(PulsarApi.CommandAck commandAck) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Consumer> completableFuture = this.consumers.get(commandAck.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            return;
        }
        completableFuture.getNow(null).messageAcked(commandAck);
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleFlow(PulsarApi.CommandFlow commandFlow) {
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received flow from consumer {} permits: {}", new Object[]{this.remoteAddress, Long.valueOf(commandFlow.getConsumerId()), Integer.valueOf(commandFlow.getMessagePermits())});
        }
        CompletableFuture<Consumer> completableFuture = this.consumers.get(commandFlow.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            return;
        }
        Consumer now = completableFuture.getNow(null);
        if (now != null) {
            now.flowPermits(commandFlow.getMessagePermits());
        } else {
            log.info("[{}] Couldn't find consumer {}", this.remoteAddress, Long.valueOf(commandFlow.getConsumerId()));
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleRedeliverUnacknowledged(PulsarApi.CommandRedeliverUnacknowledgedMessages commandRedeliverUnacknowledgedMessages) {
        Preconditions.checkArgument(this.state == State.Connected);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received Resend Command from consumer {} ", this.remoteAddress, Long.valueOf(commandRedeliverUnacknowledgedMessages.getConsumerId()));
        }
        CompletableFuture<Consumer> completableFuture = this.consumers.get(commandRedeliverUnacknowledgedMessages.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            return;
        }
        Consumer now = completableFuture.getNow(null);
        if (commandRedeliverUnacknowledgedMessages.getMessageIdsCount() <= 0 || now.subType() != PulsarApi.CommandSubscribe.SubType.Shared) {
            now.redeliverUnacknowledgedMessages();
        } else {
            now.redeliverUnacknowledgedMessages(commandRedeliverUnacknowledgedMessages.getMessageIdsList());
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleUnsubscribe(PulsarApi.CommandUnsubscribe commandUnsubscribe) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Consumer> completableFuture = this.consumers.get(commandUnsubscribe.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            this.ctx.writeAndFlush(Commands.newError(commandUnsubscribe.getRequestId(), PulsarApi.ServerError.MetadataError, "Consumer not found"));
        } else {
            completableFuture.getNow(null).doUnsubscribe(commandUnsubscribe.getRequestId());
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleSeek(PulsarApi.CommandSeek commandSeek) {
        Preconditions.checkArgument(this.state == State.Connected);
        long requestId = commandSeek.getRequestId();
        CompletableFuture<Consumer> completableFuture = this.consumers.get(commandSeek.getConsumerId());
        if (!commandSeek.hasMessageId()) {
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.MetadataError, "Message id was not present"));
            return;
        }
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.MetadataError, "Consumer not found"));
            return;
        }
        Subscription subscription = completableFuture.getNow(null).getSubscription();
        PulsarApi.MessageIdData messageId = commandSeek.getMessageId();
        PositionImpl positionImpl = new PositionImpl(messageId.getLedgerId(), messageId.getEntryId());
        subscription.resetCursor(positionImpl).thenRun(() -> {
            log.info("[{}] [{}][{}] Reset subscription to message id {}", new Object[]{this.remoteAddress, subscription.getTopic().getName(), subscription.getName(), positionImpl});
            this.ctx.writeAndFlush(Commands.newSuccess(requestId));
        }).exceptionally(th -> {
            log.warn("[{}][{}] Failed to reset subscription: {}", new Object[]{this.remoteAddress, subscription, th.getMessage(), th});
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.UnknownError, "Error when resetting subscription: " + th.getCause().getMessage()));
            return null;
        });
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleCloseProducer(PulsarApi.CommandCloseProducer commandCloseProducer) {
        Preconditions.checkArgument(this.state == State.Connected);
        long producerId = commandCloseProducer.getProducerId();
        long requestId = commandCloseProducer.getRequestId();
        CompletableFuture<Producer> completableFuture = this.producers.get(producerId);
        if (completableFuture == null) {
            log.warn("[{}] Producer {} was not registered on the connection", this.remoteAddress, Long.valueOf(producerId));
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.UnknownError, "Producer was not registered on the connection"));
            return;
        }
        if (!completableFuture.isDone() && completableFuture.completeExceptionally(new IllegalStateException("Closed producer before creation was complete"))) {
            log.info("[{}] Closed producer {} before its creation was completed", this.remoteAddress, Long.valueOf(producerId));
            this.ctx.writeAndFlush(Commands.newSuccess(requestId));
        } else if (completableFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed producer {} that already failed to be created", this.remoteAddress, Long.valueOf(producerId));
            this.ctx.writeAndFlush(Commands.newSuccess(requestId));
        } else {
            Producer now = completableFuture.getNow(null);
            log.info("[{}][{}] Closing producer on cnx {}", new Object[]{now.getTopic(), now.getProducerName(), this.remoteAddress});
            now.close().thenAccept(r14 -> {
                log.info("[{}][{}] Closed producer on cnx {}", new Object[]{now.getTopic(), now.getProducerName(), this.remoteAddress});
                this.ctx.writeAndFlush(Commands.newSuccess(requestId));
                this.producers.remove(producerId, completableFuture);
            });
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleCloseConsumer(PulsarApi.CommandCloseConsumer commandCloseConsumer) {
        Preconditions.checkArgument(this.state == State.Connected);
        log.info("[{}] Closing consumer: {}", this.remoteAddress, Long.valueOf(commandCloseConsumer.getConsumerId()));
        long requestId = commandCloseConsumer.getRequestId();
        long consumerId = commandCloseConsumer.getConsumerId();
        CompletableFuture<Consumer> completableFuture = this.consumers.get(consumerId);
        if (completableFuture == null) {
            log.warn("[{}] Consumer was not registered on the connection: {}", Long.valueOf(consumerId), this.remoteAddress);
            this.ctx.writeAndFlush(Commands.newError(requestId, PulsarApi.ServerError.MetadataError, "Consumer not found"));
            return;
        }
        if (!completableFuture.isDone() && completableFuture.completeExceptionally(new IllegalStateException("Closed consumer before creation was complete"))) {
            log.info("[{}] Closed consumer {} before its creation was completed", this.remoteAddress, Long.valueOf(consumerId));
            this.ctx.writeAndFlush(Commands.newSuccess(requestId));
            return;
        }
        if (completableFuture.isCompletedExceptionally()) {
            log.info("[{}] Closed consumer {} that already failed to be created", this.remoteAddress, Long.valueOf(consumerId));
            this.ctx.writeAndFlush(Commands.newSuccess(requestId));
            return;
        }
        Consumer now = completableFuture.getNow(null);
        try {
            now.close();
            this.consumers.remove(consumerId, completableFuture);
            this.ctx.writeAndFlush(Commands.newSuccess(requestId));
            log.info("[{}] Closed consumer {}", this.remoteAddress, now);
        } catch (BrokerServiceException e) {
            log.warn("[{]] Error closing consumer: ", new Object[]{this.remoteAddress, now, e});
            this.ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(e), e.getMessage()));
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleGetLastMessageId(PulsarApi.CommandGetLastMessageId commandGetLastMessageId) {
        Preconditions.checkArgument(this.state == State.Connected);
        CompletableFuture<Consumer> completableFuture = this.consumers.get(commandGetLastMessageId.getConsumerId());
        if (completableFuture == null || !completableFuture.isDone() || completableFuture.isCompletedExceptionally()) {
            this.ctx.writeAndFlush(Commands.newError(commandGetLastMessageId.getRequestId(), PulsarApi.ServerError.MetadataError, "Consumer not found"));
            return;
        }
        Consumer now = completableFuture.getNow(null);
        long requestId = commandGetLastMessageId.getRequestId();
        Topic topic = now.getSubscription().getTopic();
        Position lastMessageId = topic.getLastMessageId();
        int partitionIndex = TopicName.getPartitionIndex(topic.getName());
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}][{}] Get LastMessageId {} partitionIndex {}", new Object[]{this.remoteAddress, topic.getName(), now.getSubscription().getName(), lastMessageId, Integer.valueOf(partitionIndex)});
        }
        this.ctx.writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, PulsarApi.MessageIdData.newBuilder().setLedgerId(((PositionImpl) lastMessageId).getLedgerId()).setEntryId(((PositionImpl) lastMessageId).getEntryId()).setPartition(partitionIndex).build()));
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarDecoder
    protected void handleGetTopicsOfNamespace(PulsarApi.CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
        long requestId = commandGetTopicsOfNamespace.getRequestId();
        String namespace = commandGetTopicsOfNamespace.getNamespace();
        try {
            List<String> listOfTopics = getBrokerService().pulsar().getNamespaceService().getListOfTopics(NamespaceName.get(namespace));
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}", new Object[]{this.remoteAddress, namespace, Long.valueOf(requestId), Integer.valueOf(listOfTopics.size())});
            }
            this.ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(listOfTopics, requestId));
        } catch (Exception e) {
            log.warn("[{]] Error GetTopicsOfNamespace for namespace [//{}] by {}", new Object[]{this.remoteAddress, namespace, Long.valueOf(requestId)});
            this.ctx.writeAndFlush(Commands.newError(requestId, BrokerServiceException.getClientErrorCode(new BrokerServiceException.ServerMetadataException(e)), e.getMessage()));
        }
    }

    @Override // org.apache.pulsar.shade.org.apache.pulsar.common.api.PulsarHandler
    protected boolean isHandshakeCompleted() {
        return this.state == State.Connected;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ChannelHandlerContext ctx() {
        return this.ctx;
    }

    public void closeProducer(Producer producer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: {}", this.remoteAddress, producer);
        }
        long producerId = producer.getProducerId();
        this.producers.remove(producerId);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush(Commands.newCloseProducer(producerId, -1L));
        } else {
            close();
        }
    }

    public void closeConsumer(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: {}", this.remoteAddress, consumer);
        }
        long consumerId = consumer.consumerId();
        this.consumers.remove(consumerId);
        if (this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v5.getNumber()) {
            this.ctx.writeAndFlush(Commands.newCloseConsumer(consumerId, -1L));
        } else {
            close();
        }
    }

    protected void close() {
        this.ctx.close();
    }

    public SocketAddress clientAddress() {
        return this.remoteAddress;
    }

    public void removedConsumer(Consumer consumer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed consumer: {}", this.remoteAddress, consumer);
        }
        this.consumers.remove(consumer.consumerId());
    }

    public void removedProducer(Producer producer) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Removed producer: {}", this.remoteAddress, producer);
        }
        this.producers.remove(producer.getProducerId());
    }

    public boolean isActive() {
        return this.isActive;
    }

    public boolean isWritable() {
        return this.ctx.channel().isWritable();
    }

    public void startSendOperation() {
        int i = this.pendingSendRequest + 1;
        this.pendingSendRequest = i;
        if (i == 1000) {
            this.ctx.channel().config().setAutoRead(false);
        }
    }

    public void completedSendOperation(boolean z) {
        int i = this.pendingSendRequest - 1;
        this.pendingSendRequest = i;
        if (i == 500) {
            this.ctx.channel().config().setAutoRead(true);
        }
        if (z) {
            this.nonPersistentPendingMessages--;
        }
    }

    private <T> PulsarApi.ServerError getErrorCode(CompletableFuture<T> completableFuture) {
        PulsarApi.ServerError serverError = PulsarApi.ServerError.UnknownError;
        try {
            completableFuture.getNow(null);
        } catch (Exception e) {
            if (e.getCause() instanceof BrokerServiceException) {
                serverError = BrokerServiceException.getClientErrorCode((BrokerServiceException) e.getCause());
            }
        }
        return serverError;
    }

    private final void disableTcpNoDelayIfNeeded(String str, String str2) {
        if (str2 == null || !str2.startsWith(this.replicatorPrefix)) {
            return;
        }
        try {
            if (((Boolean) this.ctx.channel().config().getOption(ChannelOption.TCP_NODELAY)).booleanValue()) {
                this.ctx.channel().config().setOption(ChannelOption.TCP_NODELAY, false);
            }
        } catch (Throwable unused) {
            log.warn("[{}] [{}] Failed to remove TCP no-delay property on client cnx {}", new Object[]{str, str2, this.ctx.channel()});
        }
    }

    private TopicName validateTopicName(String str, long j, GeneratedMessageLite generatedMessageLite) {
        try {
            return TopicName.get(str);
        } catch (Throwable th) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed to parse topic name '{}'", new Object[]{this.remoteAddress, str, th});
            }
            if (generatedMessageLite instanceof PulsarApi.CommandLookupTopic) {
                this.ctx.writeAndFlush(Commands.newLookupErrorResponse(PulsarApi.ServerError.InvalidTopicName, "Invalid topic name: " + th.getMessage(), j));
                return null;
            }
            if (generatedMessageLite instanceof PulsarApi.CommandPartitionedTopicMetadata) {
                this.ctx.writeAndFlush(Commands.newPartitionMetadataResponse(PulsarApi.ServerError.InvalidTopicName, "Invalid topic name: " + th.getMessage(), j));
                return null;
            }
            this.ctx.writeAndFlush(Commands.newError(j, PulsarApi.ServerError.InvalidTopicName, "Invalid topic name: " + th.getMessage()));
            return null;
        }
    }

    public State getState() {
        return this.state;
    }

    public BrokerService getBrokerService() {
        return this.service;
    }

    public String getRole() {
        return this.authRole;
    }

    boolean hasConsumer(long j) {
        return this.consumers.containsKey(j);
    }

    public boolean isBatchMessageCompatibleVersion() {
        return this.remoteEndpointProtocolVersion >= PulsarApi.ProtocolVersion.v4.getNumber();
    }

    public String getClientVersion() {
        return this.clientVersion;
    }

    static /* synthetic */ int[] $SWITCH_TABLE$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type() {
        int[] iArr = $SWITCH_TABLE$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type;
        if (iArr != null) {
            return iArr;
        }
        int[] iArr2 = new int[PulsarApi.Schema.Type.values().length];
        try {
            iArr2[PulsarApi.Schema.Type.Avro.ordinal()] = 5;
        } catch (NoSuchFieldError unused) {
        }
        try {
            iArr2[PulsarApi.Schema.Type.Json.ordinal()] = 3;
        } catch (NoSuchFieldError unused2) {
        }
        try {
            iArr2[PulsarApi.Schema.Type.None.ordinal()] = 1;
        } catch (NoSuchFieldError unused3) {
        }
        try {
            iArr2[PulsarApi.Schema.Type.Protobuf.ordinal()] = 4;
        } catch (NoSuchFieldError unused4) {
        }
        try {
            iArr2[PulsarApi.Schema.Type.String.ordinal()] = 2;
        } catch (NoSuchFieldError unused5) {
        }
        $SWITCH_TABLE$org$apache$pulsar$common$api$proto$PulsarApi$Schema$Type = iArr2;
        return iArr2;
    }
}
