package org.apache.pulsar.client.impl;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.netty.util.concurrent.ScheduledFuture;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.PropertyAccessor;

/* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.6.3.jar:org/apache/pulsar/client/impl/ProducerImpl.class */
public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, ConnectionHandler.Connection {
    protected final long producerId;
    private volatile long msgIdGenerator;
    private final BlockingQueue<OpSendMsg> pendingMessages;
    private final BlockingQueue<OpSendMsg> pendingCallbacks;
    private final Semaphore semaphore;
    private volatile Timeout sendTimeout;
    private volatile Timeout batchMessageAndSendTimeout;
    private long createProducerTimeout;
    private final BatchMessageContainerBase batchMessageContainer;
    private CompletableFuture<MessageId> lastSendFuture;
    private String producerName;
    private boolean userProvidedProducerName;
    private String connectionId;
    private String connectedSince;
    private final int partitionIndex;
    private final ProducerStatsRecorder stats;
    private final CompressionCodec compressor;
    private volatile long lastSequenceIdPublished;
    protected volatile long lastSequenceIdPushed;
    private volatile boolean isLastSequenceIdPotentialDuplicated;
    private final MessageCrypto msgCrypto;
    private ScheduledFuture<?> keyGeneratorTask;
    private final Map<String, String> metadata;
    private Optional<byte[]> schemaVersion;
    private final ConnectionHandler connectionHandler;
    TimerTask batchMessageAndSendTask;
    static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUBLISHED_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "lastSequenceIdPublished");
    static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUSHED_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "lastSequenceIdPushed");
    private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "msgIdGenerator");
    private static final Logger log = LoggerFactory.getLogger((Class<?>) ProducerImpl.class);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.6.3.jar:org/apache/pulsar/client/impl/ProducerImpl$OpSendMsg.class */
    public static final class OpSendMsg {
        MessageImpl<?> msg;
        List<MessageImpl<?>> msgs;
        ByteBufPair cmd;
        SendCallback callback;
        Runnable rePopulate;
        long sequenceId;
        long createdAt;
        long firstSentAt;
        long lastSentAt;
        int retryCount;
        long batchSizeByte;
        int numMessagesInBatch;
        long highestSequenceId;
        int totalChunks;
        int chunkId;
        private final Recycler.Handle<OpSendMsg> recyclerHandle;
        private static final Recycler<OpSendMsg> RECYCLER = new Recycler<OpSendMsg>() { // from class: org.apache.pulsar.client.impl.ProducerImpl.OpSendMsg.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            /* renamed from: newObject */
            public OpSendMsg newObject2(Recycler.Handle<OpSendMsg> handle) {
                return new OpSendMsg(handle);
            }
        };

        static OpSendMsg create(MessageImpl<?> messageImpl, ByteBufPair byteBufPair, long j, SendCallback sendCallback) {
            OpSendMsg opSendMsg = RECYCLER.get();
            opSendMsg.msg = messageImpl;
            opSendMsg.cmd = byteBufPair;
            opSendMsg.callback = sendCallback;
            opSendMsg.sequenceId = j;
            opSendMsg.createdAt = System.nanoTime();
            return opSendMsg;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static OpSendMsg create(List<MessageImpl<?>> list, ByteBufPair byteBufPair, long j, SendCallback sendCallback) {
            OpSendMsg opSendMsg = RECYCLER.get();
            opSendMsg.msgs = list;
            opSendMsg.cmd = byteBufPair;
            opSendMsg.callback = sendCallback;
            opSendMsg.sequenceId = j;
            opSendMsg.createdAt = System.nanoTime();
            return opSendMsg;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static OpSendMsg create(List<MessageImpl<?>> list, ByteBufPair byteBufPair, long j, long j2, SendCallback sendCallback) {
            OpSendMsg opSendMsg = RECYCLER.get();
            opSendMsg.msgs = list;
            opSendMsg.cmd = byteBufPair;
            opSendMsg.callback = sendCallback;
            opSendMsg.sequenceId = j;
            opSendMsg.highestSequenceId = j2;
            opSendMsg.createdAt = System.nanoTime();
            return opSendMsg;
        }

        void updateSentTimestamp() {
            this.lastSentAt = System.nanoTime();
            if (this.firstSentAt == -1) {
                this.firstSentAt = this.lastSentAt;
            }
            this.retryCount++;
        }

        void sendComplete(Exception exc) {
            SendCallback sendCallback = this.callback;
            if (null != sendCallback) {
                Exception exc2 = exc;
                if (exc2 != null && (exc2 instanceof PulsarClientException.TimeoutException)) {
                    PulsarClientException.TimeoutException timeoutException = (PulsarClientException.TimeoutException) exc;
                    long sequenceId = timeoutException.getSequenceId();
                    long nanoTime = System.nanoTime();
                    exc2 = new PulsarClientException.TimeoutException(String.format("%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s", timeoutException.getMessage(), Long.valueOf(nanoTime - this.createdAt), Long.valueOf(nanoTime - this.firstSentAt), Long.valueOf(nanoTime - this.lastSentAt), Integer.valueOf(this.retryCount)), sequenceId);
                }
                sendCallback.sendComplete(exc2);
            }
        }

        void recycle() {
            this.msg = null;
            this.msgs = null;
            this.cmd = null;
            this.callback = null;
            this.rePopulate = null;
            this.sequenceId = -1L;
            this.createdAt = -1L;
            this.firstSentAt = -1L;
            this.lastSentAt = -1L;
            this.highestSequenceId = -1L;
            this.totalChunks = 0;
            this.chunkId = -1;
            this.recyclerHandle.recycle(this);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setNumMessagesInBatch(int i) {
            this.numMessagesInBatch = i;
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setBatchSizeByte(long j) {
            this.batchSizeByte = j;
        }

        void setMessageId(long j, long j2, int i) {
            if (this.msg != null) {
                this.msg.setMessageId(new MessageIdImpl(j, j2, i));
                return;
            }
            for (int i2 = 0; i2 < this.msgs.size(); i2++) {
                this.msgs.get(i2).setMessageId(new BatchMessageIdImpl(j, j2, i, i2));
            }
        }

        private OpSendMsg(Recycler.Handle<OpSendMsg> handle) {
            this.batchSizeByte = 0L;
            this.numMessagesInBatch = 1;
            this.totalChunks = 0;
            this.chunkId = -1;
            this.recyclerHandle = handle;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:META-INF/bundled-dependencies/pulsar-client-original-2.6.3.jar:org/apache/pulsar/client/impl/ProducerImpl$WriteInEventLoopCallback.class */
    public static final class WriteInEventLoopCallback implements Runnable {
        private ProducerImpl<?> producer;
        private ByteBufPair cmd;
        private long sequenceId;
        private ClientCnx cnx;
        private OpSendMsg op;
        private final Recycler.Handle<WriteInEventLoopCallback> recyclerHandle;
        private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>() { // from class: org.apache.pulsar.client.impl.ProducerImpl.WriteInEventLoopCallback.1
            /* JADX INFO: Access modifiers changed from: protected */
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // io.netty.util.Recycler
            /* renamed from: newObject */
            public WriteInEventLoopCallback newObject2(Recycler.Handle<WriteInEventLoopCallback> handle) {
                return new WriteInEventLoopCallback(handle);
            }
        };

        static WriteInEventLoopCallback create(ProducerImpl<?> producerImpl, ClientCnx clientCnx, OpSendMsg opSendMsg) {
            WriteInEventLoopCallback writeInEventLoopCallback = RECYCLER.get();
            writeInEventLoopCallback.producer = producerImpl;
            writeInEventLoopCallback.cnx = clientCnx;
            writeInEventLoopCallback.sequenceId = opSendMsg.sequenceId;
            writeInEventLoopCallback.cmd = opSendMsg.cmd;
            writeInEventLoopCallback.op = opSendMsg;
            return writeInEventLoopCallback;
        }

        @Override // java.lang.Runnable
        public void run() {
            if (ProducerImpl.log.isDebugEnabled()) {
                ProducerImpl.log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", this.producer.topic, ((ProducerImpl) this.producer).producerName, this.cnx, Long.valueOf(this.sequenceId));
            }
            try {
                this.cnx.ctx().writeAndFlush(this.cmd, this.cnx.ctx().voidPromise());
                this.op.updateSentTimestamp();
            } finally {
                recycle();
            }
        }

        private void recycle() {
            this.producer = null;
            this.cnx = null;
            this.cmd = null;
            this.sequenceId = -1L;
            this.op = null;
            this.recyclerHandle.recycle(this);
        }

        private WriteInEventLoopCallback(Recycler.Handle<WriteInEventLoopCallback> handle) {
            this.recyclerHandle = handle;
        }
    }

    public ProducerImpl(PulsarClientImpl pulsarClientImpl, String str, ProducerConfigurationData producerConfigurationData, CompletableFuture<Producer<T>> completableFuture, int i, Schema<T> schema, ProducerInterceptors producerInterceptors) {
        super(pulsarClientImpl, str, producerConfigurationData, completableFuture, schema, producerInterceptors);
        MessageCryptoBc messageCryptoBc;
        this.sendTimeout = null;
        this.batchMessageAndSendTimeout = null;
        this.lastSendFuture = CompletableFuture.completedFuture(null);
        this.userProvidedProducerName = false;
        this.keyGeneratorTask = null;
        this.schemaVersion = Optional.empty();
        this.batchMessageAndSendTask = new TimerTask() { // from class: org.apache.pulsar.client.impl.ProducerImpl.2
            @Override // io.netty.util.TimerTask
            public void run(Timeout timeout) throws Exception {
                if (timeout.isCancelled()) {
                    return;
                }
                if (ProducerImpl.log.isTraceEnabled()) {
                    ProducerImpl.log.trace("[{}] [{}] Batching the messages from the batch container from timer thread", ProducerImpl.this.topic, ProducerImpl.this.producerName);
                }
                synchronized (ProducerImpl.this) {
                    if (ProducerImpl.this.getState() == HandlerState.State.Closing || ProducerImpl.this.getState() == HandlerState.State.Closed) {
                        return;
                    }
                    ProducerImpl.this.batchMessageAndSend();
                    ProducerImpl.this.batchMessageAndSendTimeout = ProducerImpl.this.client.timer().newTimeout(this, ProducerImpl.this.conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
                }
            }
        };
        this.producerId = pulsarClientImpl.newProducerId();
        this.producerName = producerConfigurationData.getProducerName();
        if (StringUtils.isNotBlank(this.producerName)) {
            this.userProvidedProducerName = true;
        }
        this.partitionIndex = i;
        this.pendingMessages = Queues.newArrayBlockingQueue(producerConfigurationData.getMaxPendingMessages());
        this.pendingCallbacks = Queues.newArrayBlockingQueue(producerConfigurationData.getMaxPendingMessages());
        this.semaphore = new Semaphore(producerConfigurationData.getMaxPendingMessages(), true);
        this.compressor = CompressionCodecProvider.getCompressionCodec(producerConfigurationData.getCompressionType());
        if (producerConfigurationData.getInitialSequenceId() != null) {
            long longValue = producerConfigurationData.getInitialSequenceId().longValue();
            this.lastSequenceIdPublished = longValue;
            this.lastSequenceIdPushed = longValue;
            this.msgIdGenerator = longValue + 1;
        } else {
            this.lastSequenceIdPublished = -1L;
            this.lastSequenceIdPushed = -1L;
            this.msgIdGenerator = 0L;
        }
        if (producerConfigurationData.isEncryptionEnabled()) {
            String str2 = PropertyAccessor.PROPERTY_KEY_PREFIX + str + "] [" + this.producerName + "] [" + this.producerId + "]";
            if (producerConfigurationData.getMessageCrypto() != null) {
                this.msgCrypto = producerConfigurationData.getMessageCrypto();
            } else {
                try {
                    messageCryptoBc = new MessageCryptoBc(str2, true);
                } catch (Exception e) {
                    log.error("MessageCryptoBc may not included in the jar in Producer. e:", (Throwable) e);
                    messageCryptoBc = null;
                }
                this.msgCrypto = messageCryptoBc;
            }
        } else {
            this.msgCrypto = null;
        }
        if (this.msgCrypto != null) {
            this.keyGeneratorTask = pulsarClientImpl.eventLoopGroup().scheduleWithFixedDelay(() -> {
                try {
                    this.msgCrypto.addPublicKeyCipher(producerConfigurationData.getEncryptionKeys(), producerConfigurationData.getCryptoKeyReader());
                } catch (PulsarClientException.CryptoException e2) {
                    if (completableFuture.isDone()) {
                        return;
                    }
                    log.warn("[{}] [{}] [{}] Failed to add public key cipher.", str, this.producerName, Long.valueOf(this.producerId));
                    completableFuture.completeExceptionally(PulsarClientException.wrap(e2, String.format("The producer %s of the topic %s adds the public key cipher was failed", this.producerName, str)));
                }
            }, 0L, 4L, TimeUnit.HOURS);
        }
        if (producerConfigurationData.getSendTimeoutMs() > 0) {
            this.sendTimeout = pulsarClientImpl.timer().newTimeout(this, producerConfigurationData.getSendTimeoutMs(), TimeUnit.MILLISECONDS);
        }
        this.createProducerTimeout = System.currentTimeMillis() + pulsarClientImpl.getConfiguration().getOperationTimeoutMs();
        if (producerConfigurationData.isBatchingEnabled()) {
            BatcherBuilder batcherBuilder = producerConfigurationData.getBatcherBuilder();
            this.batchMessageContainer = (BatchMessageContainerBase) (batcherBuilder == null ? BatcherBuilder.DEFAULT : batcherBuilder).build();
            this.batchMessageContainer.setProducer(this);
        } else {
            this.batchMessageContainer = null;
        }
        if (pulsarClientImpl.getConfiguration().getStatsIntervalSeconds() > 0) {
            this.stats = new ProducerStatsRecorderImpl(pulsarClientImpl, producerConfigurationData, this);
        } else {
            this.stats = ProducerStatsDisabled.INSTANCE;
        }
        if (producerConfigurationData.getProperties().isEmpty()) {
            this.metadata = Collections.emptyMap();
        } else {
            this.metadata = Collections.unmodifiableMap(new HashMap(producerConfigurationData.getProperties()));
        }
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(pulsarClientImpl.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(pulsarClientImpl.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(Math.max(100L, producerConfigurationData.getSendTimeoutMs() - 100), TimeUnit.MILLISECONDS).create(), this);
        grabCnx();
    }

    public ConnectionHandler getConnectionHandler() {
        return this.connectionHandler;
    }

    private boolean isBatchMessagingEnabled() {
        return this.conf.isBatchingEnabled();
    }

    private boolean isMultiSchemaEnabled(boolean z) {
        if (this.multiSchemaMode != ProducerBase.MultiSchemaMode.Auto) {
            return this.multiSchemaMode == ProducerBase.MultiSchemaMode.Enabled;
        }
        if (!z) {
            return false;
        }
        this.multiSchemaMode = ProducerBase.MultiSchemaMode.Enabled;
        return true;
    }

    @Override // org.apache.pulsar.client.api.Producer
    public long getLastSequenceId() {
        return this.lastSequenceIdPublished;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.pulsar.client.impl.ProducerBase
    public CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
        final CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
        final MessageImpl messageImpl = (MessageImpl) beforeSend(message);
        messageImpl.getDataBuffer().retain();
        if (this.interceptors != null) {
            messageImpl.getProperties();
        }
        sendAsync(messageImpl, new SendCallback() { // from class: org.apache.pulsar.client.impl.ProducerImpl.1
            SendCallback nextCallback = null;
            MessageImpl<?> nextMsg = null;
            long createdAt = System.nanoTime();

            @Override // org.apache.pulsar.client.impl.SendCallback
            public CompletableFuture<MessageId> getFuture() {
                return completableFuture;
            }

            @Override // org.apache.pulsar.client.impl.SendCallback
            public SendCallback getNextSendCallback() {
                return this.nextCallback;
            }

            @Override // org.apache.pulsar.client.impl.SendCallback
            public MessageImpl<?> getNextMessage() {
                return this.nextMsg;
            }

            @Override // org.apache.pulsar.client.impl.SendCallback
            public void sendComplete(Exception exc) {
                try {
                    if (exc != null) {
                        ProducerImpl.this.stats.incrementSendFailed();
                        ProducerImpl.this.onSendAcknowledgement(messageImpl, null, exc);
                        completableFuture.completeExceptionally(exc);
                    } else {
                        ProducerImpl.this.onSendAcknowledgement(messageImpl, messageImpl.getMessageId(), null);
                        completableFuture.complete(messageImpl.getMessageId());
                        ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                    }
                    while (this.nextCallback != null) {
                        SendCallback sendCallback = this.nextCallback;
                        MessageImpl<?> messageImpl2 = this.nextMsg;
                        try {
                            messageImpl2.getDataBuffer().retain();
                            if (exc != null) {
                                ProducerImpl.this.stats.incrementSendFailed();
                                ProducerImpl.this.onSendAcknowledgement(messageImpl2, null, exc);
                                sendCallback.getFuture().completeExceptionally(exc);
                            } else {
                                ProducerImpl.this.onSendAcknowledgement(messageImpl2, messageImpl2.getMessageId(), null);
                                sendCallback.getFuture().complete(messageImpl2.getMessageId());
                                ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                            }
                            this.nextMsg = this.nextCallback.getNextMessage();
                            this.nextCallback = this.nextCallback.getNextSendCallback();
                            messageImpl2.getDataBuffer().release();
                        } catch (Throwable th) {
                            messageImpl2.getDataBuffer().release();
                            throw th;
                        }
                    }
                } finally {
                    messageImpl.getDataBuffer().release();
                }
            }

            @Override // org.apache.pulsar.client.impl.SendCallback
            public void addCallback(MessageImpl<?> messageImpl2, SendCallback sendCallback) {
                this.nextMsg = messageImpl2;
                this.nextCallback = sendCallback;
            }
        });
        return completableFuture;
    }

    public void sendAsync(Message<?> message, SendCallback sendCallback) {
        Preconditions.checkArgument(message instanceof MessageImpl);
        if (isValidProducerState(sendCallback, message.getSequenceId()) && canEnqueueRequest(sendCallback, message.getSequenceId())) {
            MessageImpl<?> messageImpl = (MessageImpl) message;
            PulsarApi.MessageMetadata.Builder messageBuilder = messageImpl.getMessageBuilder();
            ByteBuf dataBuffer = messageImpl.getDataBuffer();
            int readableBytes = dataBuffer.readableBytes();
            ByteBuf byteBuf = dataBuffer;
            if (!isBatchMessagingEnabled() || messageBuilder.hasDeliverAtTime()) {
                byteBuf = this.compressor.encode(dataBuffer);
                dataBuffer.release();
                int readableBytes2 = byteBuf.readableBytes();
                if (readableBytes2 > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
                    byteBuf.release();
                    completeCallbackAndReleaseSemaphore(sendCallback, new PulsarClientException.InvalidMessageException(String.format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds %d bytes", this.producerName, this.topic, (isBatchMessagingEnabled() || this.conf.getCompressionType() == CompressionType.NONE) ? "" : "Compressed", Integer.valueOf(readableBytes2), Integer.valueOf(ClientCnx.getMaxMessageSize()))));
                    return;
                }
            }
            if (!messageImpl.isReplicated() && messageBuilder.hasProducerName()) {
                completeCallbackAndReleaseSemaphore(sendCallback, new PulsarClientException.InvalidMessageException(String.format("The producer %s of the topic %s can not reuse the same message", this.producerName, this.topic), messageImpl.getSequenceId()));
                byteBuf.release();
                return;
            }
            if (!populateMessageSchema(messageImpl, sendCallback)) {
                byteBuf.release();
                return;
            }
            int max = canAddToBatch(messageImpl) ? 1 : (Math.max(1, byteBuf.readableBytes()) / ClientCnx.getMaxMessageSize()) + (Math.max(1, byteBuf.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
            for (int i = 0; i < max - 1; i++) {
                if (!canEnqueueRequest(sendCallback, message.getSequenceId())) {
                    return;
                }
            }
            try {
                synchronized (this) {
                    int i2 = 0;
                    String uuid = UUID.randomUUID().toString();
                    for (int i3 = 0; i3 < max; i3++) {
                        serializeAndSendMessage(messageImpl, messageBuilder, dataBuffer, uuid, i3, max, i2, ClientCnx.getMaxMessageSize(), byteBuf, byteBuf.readableBytes(), readableBytes, sendCallback);
                        i2 = (i3 + 1) * ClientCnx.getMaxMessageSize();
                    }
                }
            } catch (PulsarClientException e) {
                e.setSequenceId(messageImpl.getSequenceId());
                completeCallbackAndReleaseSemaphore(sendCallback, e);
            } catch (Throwable th) {
                completeCallbackAndReleaseSemaphore(sendCallback, new PulsarClientException(th, messageImpl.getSequenceId()));
            }
        }
    }

    private void serializeAndSendMessage(MessageImpl<?> messageImpl, PulsarApi.MessageMetadata.Builder builder, ByteBuf byteBuf, String str, int i, int i2, int i3, int i4, ByteBuf byteBuf2, int i5, int i6, SendCallback sendCallback) throws IOException, InterruptedException {
        long sequenceId;
        OpSendMsg create;
        ByteBuf byteBuf3 = byteBuf2;
        PulsarApi.MessageMetadata.Builder builder2 = builder;
        if (i2 > 1 && TopicName.get(this.topic).isPersistent()) {
            byteBuf3 = byteBuf2.slice(i3, Math.min(i4, byteBuf3.readableBytes() - i3));
            if (i != i2 - 1) {
                byteBuf3.retain();
                builder2 = builder.mo2934clone();
            }
            builder2.setUuid(str);
            builder2.setChunkId(i);
            builder2.setNumChunksFromMsg(i2);
            builder2.setTotalChunkMsgSize(i5);
        }
        if (builder2.hasSequenceId()) {
            sequenceId = builder2.getSequenceId();
        } else {
            sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
            builder2.setSequenceId(sequenceId);
        }
        if (!builder2.hasPublishTime()) {
            builder2.setPublishTime(this.client.getClientClock().millis());
            Preconditions.checkArgument(!builder2.hasProducerName());
            builder2.setProducerName(this.producerName);
            if (this.conf.getCompressionType() != CompressionType.NONE) {
                builder2.setCompression(CompressionCodecProvider.convertToWireProtocol(this.conf.getCompressionType()));
            }
            builder2.setUncompressedSize(i6);
        }
        if (!canAddToBatch(messageImpl) || i2 > 1) {
            ByteBuf encryptMessage = encryptMessage(builder2, byteBuf3);
            PulsarApi.MessageMetadata build = builder2.build();
            int numMessagesInBatch = messageImpl.getMessageBuilder().hasNumMessagesInBatch() ? messageImpl.getMessageBuilder().getNumMessagesInBatch() : 1;
            if (messageImpl.getSchemaState() == MessageImpl.SchemaState.Ready) {
                create = OpSendMsg.create(messageImpl, sendMessage(this.producerId, sequenceId, numMessagesInBatch, build, encryptMessage), sequenceId, sendCallback);
                builder2.recycle();
                build.recycle();
            } else {
                create = OpSendMsg.create(messageImpl, (ByteBufPair) null, sequenceId, sendCallback);
                PulsarApi.MessageMetadata.Builder builder3 = builder2;
                long j = sequenceId;
                create.rePopulate = () -> {
                    create.cmd = sendMessage(this.producerId, j, numMessagesInBatch, builder.build(), encryptMessage);
                    builder3.recycle();
                    build.recycle();
                };
            }
            create.setNumMessagesInBatch(numMessagesInBatch);
            create.setBatchSizeByte(encryptMessage.readableBytes());
            if (i2 > 1) {
                create.totalChunks = i2;
                create.chunkId = i;
            }
            this.lastSendFuture = sendCallback.getFuture();
            processOpSendMsg(create);
            return;
        }
        if (!canAddToCurrentBatch(messageImpl)) {
            doBatchSendAndAdd(messageImpl, sendCallback, byteBuf);
            return;
        }
        if (sequenceId <= this.lastSequenceIdPushed) {
            this.isLastSequenceIdPotentialDuplicated = true;
            if (sequenceId <= this.lastSequenceIdPublished) {
                log.warn("Message with sequence id {} is definitely a duplicate", Long.valueOf(sequenceId));
            } else {
                log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.", Long.valueOf(sequenceId));
            }
            doBatchSendAndAdd(messageImpl, sendCallback, byteBuf);
            return;
        }
        if (this.isLastSequenceIdPotentialDuplicated) {
            doBatchSendAndAdd(messageImpl, sendCallback, byteBuf);
        } else {
            boolean add = this.batchMessageContainer.add(messageImpl, sendCallback);
            this.lastSendFuture = sendCallback.getFuture();
            byteBuf.release();
            if (add) {
                batchMessageAndSend();
            }
        }
        this.isLastSequenceIdPotentialDuplicated = false;
    }

    private boolean populateMessageSchema(MessageImpl messageImpl, SendCallback sendCallback) {
        PulsarApi.MessageMetadata.Builder messageBuilder = messageImpl.getMessageBuilder();
        if (messageImpl.getSchema() == this.schema) {
            this.schemaVersion.ifPresent(bArr -> {
                messageBuilder.setSchemaVersion(ByteString.copyFrom(bArr));
            });
            messageImpl.setSchemaState(MessageImpl.SchemaState.Ready);
            return true;
        }
        if (!isMultiSchemaEnabled(true)) {
            completeCallbackAndReleaseSemaphore(sendCallback, new PulsarClientException.InvalidMessageException(String.format("The producer %s of the topic %s is disabled the `MultiSchema`", this.producerName, this.topic), messageImpl.getSequenceId()));
            return false;
        }
        byte[] bArr2 = this.schemaCache.get(SchemaHash.of(messageImpl.getSchema()));
        if (bArr2 == null) {
            return true;
        }
        messageBuilder.setSchemaVersion(ByteString.copyFrom(bArr2));
        messageImpl.setSchemaState(MessageImpl.SchemaState.Ready);
        return true;
    }

    private boolean rePopulateMessageSchema(MessageImpl messageImpl) {
        byte[] bArr = this.schemaCache.get(SchemaHash.of(messageImpl.getSchema()));
        if (bArr == null) {
            return false;
        }
        messageImpl.getMessageBuilder().setSchemaVersion(ByteString.copyFrom(bArr));
        messageImpl.setSchemaState(MessageImpl.SchemaState.Ready);
        return true;
    }

    private void tryRegisterSchema(ClientCnx clientCnx, MessageImpl messageImpl, SendCallback sendCallback) {
        if (changeToRegisteringSchemaState()) {
            getOrCreateSchemaAsync(clientCnx, (SchemaInfo) Optional.ofNullable(messageImpl.getSchema()).map((v0) -> {
                return v0.getSchemaInfo();
            }).filter(schemaInfo -> {
                return schemaInfo.getType().getValue() > 0;
            }).orElse(Schema.BYTES.getSchemaInfo())).handle((bArr, th) -> {
                if (th != null) {
                    Throwable unwrapCompletionException = FutureUtil.unwrapCompletionException(th);
                    log.warn("[{}] [{}] GetOrCreateSchema error", this.topic, this.producerName, unwrapCompletionException);
                    if (unwrapCompletionException instanceof PulsarClientException.IncompatibleSchemaException) {
                        messageImpl.setSchemaState(MessageImpl.SchemaState.Broken);
                        sendCallback.sendComplete((PulsarClientException.IncompatibleSchemaException) unwrapCompletionException);
                    }
                } else {
                    log.warn("[{}] [{}] GetOrCreateSchema succeed", this.topic, this.producerName);
                    this.schemaCache.putIfAbsent(SchemaHash.of(messageImpl.getSchema()), bArr);
                    messageImpl.getMessageBuilder().setSchemaVersion(ByteString.copyFrom(bArr));
                    messageImpl.setSchemaState(MessageImpl.SchemaState.Ready);
                }
                clientCnx.ctx().channel().eventLoop().execute(() -> {
                    synchronized (this) {
                        recoverProcessOpSendMsgFrom(clientCnx, messageImpl);
                    }
                });
                return null;
            });
        }
    }

    private CompletableFuture<byte[]> getOrCreateSchemaAsync(ClientCnx clientCnx, SchemaInfo schemaInfo) {
        if (!Commands.peerSupportsGetOrCreateSchema(clientCnx.getRemoteEndpointProtocolVersion())) {
            return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException(String.format("The command `GetOrCreateSchema` is not supported for the protocol version %d. The producer is %s, topic is %s", Integer.valueOf(clientCnx.getRemoteEndpointProtocolVersion()), this.producerName, this.topic)));
        }
        long newRequestId = this.client.newRequestId();
        ByteBuf newGetOrCreateSchema = Commands.newGetOrCreateSchema(newRequestId, this.topic, schemaInfo);
        log.info("[{}] [{}] GetOrCreateSchema request", this.topic, this.producerName);
        return clientCnx.sendGetOrCreateSchema(newGetOrCreateSchema, newRequestId);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ByteBuf encryptMessage(PulsarApi.MessageMetadata.Builder builder, ByteBuf byteBuf) throws PulsarClientException {
        if (!this.conf.isEncryptionEnabled() || this.msgCrypto == null) {
            return byteBuf;
        }
        try {
            return this.msgCrypto.encrypt(this.conf.getEncryptionKeys(), this.conf.getCryptoKeyReader(), () -> {
                return builder;
            }, byteBuf);
        } catch (PulsarClientException e) {
            if (this.conf.getCryptoFailureAction() != ProducerCryptoFailureAction.SEND) {
                throw e;
            }
            log.warn("[{}] [{}] Failed to encrypt message {}. Proceeding with publishing unencrypted message", this.topic, this.producerName, e.getMessage());
            return byteBuf;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ByteBufPair sendMessage(long j, long j2, int i, PulsarApi.MessageMetadata messageMetadata, ByteBuf byteBuf) {
        return Commands.newSend(j, j2, i, getChecksumType(), messageMetadata, byteBuf);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ByteBufPair sendMessage(long j, long j2, long j3, int i, PulsarApi.MessageMetadata messageMetadata, ByteBuf byteBuf) {
        return Commands.newSend(j, j2, j3, i, getChecksumType(), messageMetadata, byteBuf);
    }

    private Commands.ChecksumType getChecksumType() {
        return (this.connectionHandler.cnx() == null || this.connectionHandler.cnx().getRemoteEndpointProtocolVersion() >= brokerChecksumSupportedVersion()) ? Commands.ChecksumType.Crc32c : Commands.ChecksumType.None;
    }

    private boolean canAddToBatch(MessageImpl<?> messageImpl) {
        return messageImpl.getSchemaState() == MessageImpl.SchemaState.Ready && isBatchMessagingEnabled() && !messageImpl.getMessageBuilder().hasDeliverAtTime();
    }

    private boolean canAddToCurrentBatch(MessageImpl<?> messageImpl) {
        return this.batchMessageContainer.haveEnoughSpace(messageImpl) && (!isMultiSchemaEnabled(false) || this.batchMessageContainer.hasSameSchema(messageImpl));
    }

    private void doBatchSendAndAdd(MessageImpl<?> messageImpl, SendCallback sendCallback, ByteBuf byteBuf) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Closing out batch to accommodate large message with size {}", this.topic, this.producerName, Integer.valueOf(messageImpl.getDataBuffer().readableBytes()));
        }
        try {
            batchMessageAndSend();
            this.batchMessageContainer.add(messageImpl, sendCallback);
            this.lastSendFuture = sendCallback.getFuture();
            byteBuf.release();
        } catch (Throwable th) {
            byteBuf.release();
            throw th;
        }
    }

    private boolean isValidProducerState(SendCallback sendCallback, long j) {
        switch (getState()) {
            case Ready:
            case Connecting:
            case RegisteringSchema:
                return true;
            case Closing:
            case Closed:
                sendCallback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed", j));
                return false;
            case Terminated:
                sendCallback.sendComplete(new PulsarClientException.TopicTerminatedException("Topic was terminated", j));
                return false;
            case Failed:
            case Uninitialized:
            default:
                sendCallback.sendComplete(new PulsarClientException.NotConnectedException(j));
                return false;
        }
    }

    private boolean canEnqueueRequest(SendCallback sendCallback, long j) {
        try {
            if (this.conf.isBlockIfQueueFull()) {
                this.semaphore.acquire();
                return true;
            }
            if (this.semaphore.tryAcquire()) {
                return true;
            }
            sendCallback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", j));
            return false;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            sendCallback.sendComplete(new PulsarClientException(e, j));
            return false;
        }
    }

    @Override // org.apache.pulsar.client.impl.ProducerBase, org.apache.pulsar.client.api.Producer
    public CompletableFuture<Void> closeAsync() {
        HandlerState.State andUpdateState = getAndUpdateState(state -> {
            return state == HandlerState.State.Closed ? state : HandlerState.State.Closing;
        });
        if (andUpdateState == HandlerState.State.Closed || andUpdateState == HandlerState.State.Closing) {
            return CompletableFuture.completedFuture(null);
        }
        Timeout timeout = this.sendTimeout;
        if (timeout != null) {
            timeout.cancel();
            this.sendTimeout = null;
        }
        Timeout timeout2 = this.batchMessageAndSendTimeout;
        if (timeout2 != null) {
            timeout2.cancel();
            this.batchMessageAndSendTimeout = null;
        }
        if (this.keyGeneratorTask != null && !this.keyGeneratorTask.isCancelled()) {
            this.keyGeneratorTask.cancel(false);
        }
        this.stats.cancelStatsTimeout();
        ClientCnx cnx = cnx();
        if (cnx != null && andUpdateState == HandlerState.State.Ready) {
            long newRequestId = this.client.newRequestId();
            ByteBuf newCloseProducer = Commands.newCloseProducer(this.producerId, newRequestId);
            CompletableFuture<Void> completableFuture = new CompletableFuture<>();
            cnx.sendRequestWithId(newCloseProducer, newRequestId).handle((producerResponse, th) -> {
                cnx.removeProducer(this.producerId);
                if (th != null && cnx.ctx().channel().isActive()) {
                    completableFuture.completeExceptionally(th);
                    return null;
                }
                synchronized (this) {
                    log.info("[{}] [{}] Closed Producer", this.topic, this.producerName);
                    setState(HandlerState.State.Closed);
                    this.pendingMessages.forEach(opSendMsg -> {
                        opSendMsg.cmd.release();
                        opSendMsg.recycle();
                    });
                    this.pendingMessages.clear();
                }
                completableFuture.complete(null);
                this.client.cleanupProducer(this);
                return null;
            });
            return completableFuture;
        }
        log.info("[{}] [{}] Closed Producer (not connected)", this.topic, this.producerName);
        synchronized (this) {
            setState(HandlerState.State.Closed);
            this.client.cleanupProducer(this);
            PulsarClientException.AlreadyClosedException alreadyClosedException = new PulsarClientException.AlreadyClosedException(String.format("The producer %s of the topic %s was already closed when closing the producers", this.producerName, this.topic));
            this.pendingMessages.forEach(opSendMsg -> {
                opSendMsg.sendComplete(alreadyClosedException);
                opSendMsg.cmd.release();
                opSendMsg.recycle();
            });
            this.pendingMessages.clear();
        }
        return CompletableFuture.completedFuture(null);
    }

    @Override // org.apache.pulsar.client.api.Producer
    public boolean isConnected() {
        return this.connectionHandler.cnx() != null && getState() == HandlerState.State.Ready;
    }

    @Override // org.apache.pulsar.client.api.Producer
    public long getLastDisconnectedTimestamp() {
        return this.connectionHandler.lastConnectionClosedTimestamp;
    }

    public boolean isWritable() {
        ClientCnx cnx = this.connectionHandler.cnx();
        return cnx != null && cnx.channel().isWritable();
    }

    public void terminated(ClientCnx clientCnx) {
        HandlerState.State andUpdateState = getAndUpdateState(state -> {
            return state == HandlerState.State.Closed ? HandlerState.State.Closed : HandlerState.State.Terminated;
        });
        if (andUpdateState == HandlerState.State.Terminated || andUpdateState == HandlerState.State.Closed) {
            return;
        }
        log.info("[{}] [{}] The topic has been terminated", this.topic, this.producerName);
        setClientCnx(null);
        failPendingMessages(clientCnx, new PulsarClientException.TopicTerminatedException(String.format("The topic %s that the producer %s produces to has been terminated", this.topic, this.producerName)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void ackReceived(ClientCnx clientCnx, long j, long j2, long j3, long j4) {
        OpSendMsg poll;
        boolean z = false;
        synchronized (this) {
            OpSendMsg peek = this.pendingMessages.peek();
            if (peek == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {} - {}", this.topic, this.producerName, Long.valueOf(j), Long.valueOf(j2));
                }
                return;
            }
            if (j > peek.sequenceId) {
                log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}", this.topic, this.producerName, Long.valueOf(peek.sequenceId), Long.valueOf(peek.highestSequenceId), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(this.pendingMessages.size()));
                clientCnx.channel().close();
            } else if (j < peek.sequenceId) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg. expecting: {} - {} - got: {} - {}", this.topic, this.producerName, Long.valueOf(peek.sequenceId), Long.valueOf(peek.highestSequenceId), Long.valueOf(j), Long.valueOf(j2));
                }
            } else if (j >= j2 || j2 == peek.highestSequenceId) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Received ack for msg {} ", this.topic, this.producerName, Long.valueOf(j));
                }
                this.pendingMessages.remove();
                releaseSemaphoreForSendOp(peek);
                z = true;
                this.pendingCallbacks.add(peek);
            } else {
                log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}", this.topic, this.producerName, Long.valueOf(peek.sequenceId), Long.valueOf(peek.highestSequenceId), Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(this.pendingMessages.size()));
                clientCnx.channel().close();
            }
            if (!z || (poll = this.pendingCallbacks.poll()) == null) {
                return;
            }
            LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, j5 -> {
                return Math.max(j5, getHighestSequenceId(poll));
            });
            poll.setMessageId(j3, j4, this.partitionIndex);
            try {
                if (poll.totalChunks <= 1 || poll.chunkId == poll.totalChunks - 1) {
                    try {
                        poll.sendComplete(null);
                    } catch (Throwable th) {
                        log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", this.topic, this.producerName, Long.valueOf(j), th);
                    }
                }
            } catch (Throwable th2) {
                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", this.topic, this.producerName, Long.valueOf(j), th2);
            }
            ReferenceCountUtil.safeRelease(poll.cmd);
            poll.recycle();
        }
    }

    private long getHighestSequenceId(OpSendMsg opSendMsg) {
        return Math.max(opSendMsg.highestSequenceId, opSendMsg.sequenceId);
    }

    private void releaseSemaphoreForSendOp(OpSendMsg opSendMsg) {
        this.semaphore.release(isBatchMessagingEnabled() ? opSendMsg.numMessagesInBatch : 1);
    }

    private void completeCallbackAndReleaseSemaphore(SendCallback sendCallback, Exception exc) {
        this.semaphore.release();
        sendCallback.sendComplete(exc);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public synchronized void recoverChecksumError(ClientCnx clientCnx, long j) {
        OpSendMsg peek = this.pendingMessages.peek();
        if (peek == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Got send failure for timed out msg {}", this.topic, this.producerName, Long.valueOf(j));
            }
        } else if (j == getHighestSequenceId(peek)) {
            if (!verifyLocalBufferIsNotCorrupted(peek)) {
                this.pendingMessages.remove();
                releaseSemaphoreForSendOp(peek);
                try {
                    peek.sendComplete(new PulsarClientException.ChecksumException(String.format("The checksum of the message which is produced by producer %s to the topic %s is corrupted", this.producerName, this.topic)));
                } catch (Throwable th) {
                    log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", this.topic, this.producerName, Long.valueOf(j), th);
                }
                ReferenceCountUtil.safeRelease(peek.cmd);
                peek.recycle();
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Message is not corrupted, retry send-message with sequenceId {}", this.topic, this.producerName, Long.valueOf(j));
            }
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Corrupt message is already timed out {}", this.topic, this.producerName, Long.valueOf(j));
        }
        resendMessages(clientCnx);
    }

    protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg opSendMsg) {
        ByteBufPair byteBufPair = opSendMsg.cmd;
        if (byteBufPair == null) {
            log.warn("[{}] Failed while casting {} into ByteBufPair", this.producerName, opSendMsg.cmd == null ? null : opSendMsg.cmd.getClass().getName());
            return false;
        }
        ByteBuf first = byteBufPair.getFirst();
        first.markReaderIndex();
        try {
            first.skipBytes(4);
            first.skipBytes((int) first.readUnsignedInt());
            if (Commands.hasChecksum(first)) {
                return ((long) Commands.readChecksum(first)) == ((long) Crc32cIntChecksum.resumeChecksum(Crc32cIntChecksum.computeChecksum(first), byteBufPair.getSecond()));
            }
            log.warn("[{}] [{}] checksum is not present into message with id {}", this.topic, this.producerName, Long.valueOf(opSendMsg.sequenceId));
            first.resetReaderIndex();
            return true;
        } finally {
            first.resetReaderIndex();
        }
    }

    @Override // org.apache.pulsar.client.impl.ConnectionHandler.Connection
    public void connectionOpened(ClientCnx clientCnx) {
        this.connectionHandler.setClientCnx(clientCnx);
        clientCnx.registerProducer(this.producerId, this);
        log.info("[{}] [{}] Creating producer on cnx {}", this.topic, this.producerName, clientCnx.ctx().channel());
        long newRequestId = this.client.newRequestId();
        SchemaInfo schemaInfo = null;
        if (this.schema != null && this.schema.getSchemaInfo() != null) {
            schemaInfo = this.schema.getSchemaInfo().getType() == SchemaType.JSON ? Commands.peerSupportJsonSchemaAvroFormat(clientCnx.getRemoteEndpointProtocolVersion()) ? this.schema.getSchemaInfo() : this.schema instanceof JSONSchema ? ((JSONSchema) this.schema).getBackwardsCompatibleJsonSchemaInfo() : this.schema.getSchemaInfo() : (this.schema.getSchemaInfo().getType() == SchemaType.BYTES || this.schema.getSchemaInfo().getType() == SchemaType.NONE) ? null : this.schema.getSchemaInfo();
        }
        clientCnx.sendRequestWithId(Commands.newProducer(this.topic, this.producerId, newRequestId, this.producerName, this.conf.isEncryptionEnabled(), this.metadata, schemaInfo, this.connectionHandler.epoch, this.userProvidedProducerName), newRequestId).thenAccept(producerResponse -> {
            String producerName = producerResponse.getProducerName();
            long lastSequenceId = producerResponse.getLastSequenceId();
            this.schemaVersion = Optional.ofNullable(producerResponse.getSchemaVersion());
            this.schemaVersion.ifPresent(bArr -> {
                this.schemaCache.put(SchemaHash.of(this.schema), bArr);
            });
            synchronized (this) {
                if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
                    clientCnx.removeProducer(this.producerId);
                    clientCnx.channel().close();
                    return;
                }
                resetBackoff();
                log.info("[{}] [{}] Created producer on cnx {}", this.topic, producerName, clientCnx.ctx().channel());
                this.connectionId = clientCnx.ctx().channel().toString();
                this.connectedSince = DateFormatter.now();
                if (this.producerName == null) {
                    this.producerName = producerName;
                }
                if (this.msgIdGenerator == 0 && this.conf.getInitialSequenceId() == null) {
                    this.lastSequenceIdPublished = lastSequenceId;
                    this.msgIdGenerator = lastSequenceId + 1;
                }
                if (!this.producerCreatedFuture.isDone() && isBatchMessagingEnabled()) {
                    this.client.timer().newTimeout(this.batchMessageAndSendTask, this.conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
                }
                resendMessages(clientCnx);
            }
        }).exceptionally(th -> {
            Throwable cause = th.getCause();
            clientCnx.removeProducer(this.producerId);
            if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
                clientCnx.channel().close();
                return null;
            }
            log.error("[{}] [{}] Failed to create producer: {}", this.topic, this.producerName, cause.getMessage());
            if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
                closeAsync().whenComplete((r6, th) -> {
                    if (th != null) {
                        log.error("Failed to close producer on TopicDoesNotExistException.", th);
                    }
                    this.producerCreatedFuture.completeExceptionally(cause);
                });
                return null;
            }
            if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
                synchronized (this) {
                    log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", this.topic, this.producerName);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Pending messages: {}", this.topic, this.producerName, Integer.valueOf(this.pendingMessages.size()));
                    }
                    failPendingMessages(cnx(), new PulsarClientException.ProducerBlockedQuotaExceededException(String.format("The backlog quota of the topic %s that the producer %s produces to is exceeded", this.topic, this.producerName)));
                }
            } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
                log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.", this.producerName, this.topic);
            }
            if (cause instanceof PulsarClientException.TopicTerminatedException) {
                setState(HandlerState.State.Terminated);
                failPendingMessages(cnx(), (PulsarClientException) cause);
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
                return null;
            }
            if (this.producerCreatedFuture.isDone() || ((cause instanceof PulsarClientException) && PulsarClientException.isRetriableError(cause) && System.currentTimeMillis() < this.createProducerTimeout)) {
                reconnectLater(cause);
                return null;
            }
            setState(HandlerState.State.Failed);
            this.producerCreatedFuture.completeExceptionally(cause);
            this.client.cleanupProducer(this);
            Timeout timeout = this.sendTimeout;
            if (timeout == null) {
                return null;
            }
            timeout.cancel();
            this.sendTimeout = null;
            return null;
        });
    }

    @Override // org.apache.pulsar.client.impl.ConnectionHandler.Connection
    public void connectionFailed(PulsarClientException pulsarClientException) {
        boolean z = !PulsarClientException.isRetriableError(pulsarClientException);
        boolean z2 = System.currentTimeMillis() > this.createProducerTimeout;
        if ((z || z2) && this.producerCreatedFuture.completeExceptionally(pulsarClientException)) {
            if (z) {
                log.info("[{}] Producer creation failed for producer {} with unretriableError = {}", this.topic, Long.valueOf(this.producerId), pulsarClientException);
            } else {
                log.info("[{}] Producer creation failed for producer {} after producerTimeout", this.topic, Long.valueOf(this.producerId));
            }
            setState(HandlerState.State.Failed);
            this.client.cleanupProducer(this);
        }
    }

    private void resendMessages(ClientCnx clientCnx) {
        clientCnx.ctx().channel().eventLoop().execute(() -> {
            synchronized (this) {
                if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
                    clientCnx.channel().close();
                    return;
                }
                int size = this.pendingMessages.size();
                if (size != 0) {
                    log.info("[{}] [{}] Re-Sending {} messages to server", this.topic, this.producerName, Integer.valueOf(size));
                    recoverProcessOpSendMsgFrom(clientCnx, null);
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] No pending messages to resend {}", this.topic, this.producerName, Integer.valueOf(size));
                }
                if (changeToReadyState()) {
                    this.producerCreatedFuture.complete(this);
                } else {
                    clientCnx.channel().close();
                }
            }
        });
    }

    private void stripChecksum(OpSendMsg opSendMsg) {
        int readableBytes = opSendMsg.cmd.readableBytes();
        ByteBufPair byteBufPair = opSendMsg.cmd;
        if (byteBufPair == null) {
            log.warn("[{}] Failed while casting {} into ByteBufPair", this.producerName, opSendMsg.cmd == null ? null : opSendMsg.cmd.getClass().getName());
            return;
        }
        ByteBuf first = byteBufPair.getFirst();
        first.markReaderIndex();
        try {
            first.skipBytes(4);
            int readUnsignedInt = (int) first.readUnsignedInt();
            first.skipBytes(readUnsignedInt);
            if (Commands.hasChecksum(first)) {
                int i = 8 + readUnsignedInt;
                int i2 = i + 6;
                int i3 = 4 + readUnsignedInt + (readableBytes - i2);
                first.resetReaderIndex();
                int readableBytes2 = first.readableBytes();
                first.setInt(0, i3);
                ByteBuf slice = first.slice(i2, readableBytes2 - i2);
                first.writerIndex(i);
                slice.readBytes(first, slice.readableBytes());
                first.capacity(readableBytes2 - 6);
                first.resetReaderIndex();
            }
        } finally {
            first.resetReaderIndex();
        }
    }

    public int brokerChecksumSupportedVersion() {
        return PulsarApi.ProtocolVersion.v6.getNumber();
    }

    @Override // org.apache.pulsar.client.impl.HandlerState
    String getHandlerName() {
        return this.producerName;
    }

    @Override // io.netty.util.TimerTask
    public void run(Timeout timeout) throws Exception {
        long j;
        if (timeout.isCancelled()) {
            return;
        }
        synchronized (this) {
            if (getState() == HandlerState.State.Closing || getState() == HandlerState.State.Closed) {
                return;
            }
            OpSendMsg peek = this.pendingMessages.peek();
            if (peek == null) {
                j = this.conf.getSendTimeoutMs();
            } else {
                long sendTimeoutMs = this.conf.getSendTimeoutMs() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - peek.createdAt);
                if (sendTimeoutMs <= 0) {
                    log.info("[{}] [{}] Message send timed out. Failing {} messages", this.topic, this.producerName, Integer.valueOf(this.pendingMessages.size()));
                    failPendingMessages(cnx(), new PulsarClientException.TimeoutException(String.format("The producer %s can not send message to the topic %s within given timeout", this.producerName, this.topic), peek.sequenceId));
                    this.stats.incrementSendFailed(this.pendingMessages.size());
                    j = this.conf.getSendTimeoutMs();
                } else {
                    j = sendTimeoutMs;
                }
            }
            this.sendTimeout = this.client.timer().newTimeout(this, j, TimeUnit.MILLISECONDS);
        }
    }

    private void failPendingMessages(ClientCnx clientCnx, PulsarClientException pulsarClientException) {
        if (clientCnx != null) {
            clientCnx.ctx().channel().eventLoop().execute(() -> {
                synchronized (this) {
                    failPendingMessages(null, pulsarClientException);
                }
            });
            return;
        }
        AtomicInteger atomicInteger = new AtomicInteger();
        boolean isBatchMessagingEnabled = isBatchMessagingEnabled();
        this.pendingMessages.forEach(opSendMsg -> {
            atomicInteger.addAndGet(isBatchMessagingEnabled ? opSendMsg.numMessagesInBatch : 1);
            try {
                pulsarClientException.setSequenceId(opSendMsg.sequenceId);
                if (opSendMsg.totalChunks <= 1 || opSendMsg.chunkId == opSendMsg.totalChunks - 1) {
                    opSendMsg.sendComplete(pulsarClientException);
                }
            } catch (Throwable th) {
                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", this.topic, this.producerName, Long.valueOf(opSendMsg.sequenceId), th);
            }
            ReferenceCountUtil.safeRelease(opSendMsg.cmd);
            opSendMsg.recycle();
        });
        this.pendingMessages.clear();
        this.pendingCallbacks.clear();
        this.semaphore.release(atomicInteger.get());
        if (isBatchMessagingEnabled) {
            failPendingBatchMessages(pulsarClientException);
        }
    }

    private void failPendingBatchMessages(PulsarClientException pulsarClientException) {
        if (this.batchMessageContainer.isEmpty()) {
            return;
        }
        int numMessagesInBatch = this.batchMessageContainer.getNumMessagesInBatch();
        this.batchMessageContainer.discard(pulsarClientException);
        this.semaphore.release(numMessagesInBatch);
    }

    @Override // org.apache.pulsar.client.api.Producer
    public CompletableFuture<Void> flushAsync() {
        CompletableFuture<MessageId> completableFuture;
        synchronized (this) {
            if (isBatchMessagingEnabled()) {
                batchMessageAndSend();
            }
            completableFuture = this.lastSendFuture;
        }
        return completableFuture.thenApply(messageId -> {
            return null;
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.pulsar.client.impl.ProducerBase
    public void triggerFlush() {
        if (isBatchMessagingEnabled()) {
            synchronized (this) {
                batchMessageAndSend();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void batchMessageAndSend() {
        if (log.isTraceEnabled()) {
            log.trace("[{}] [{}] Batching the messages from the batch container with {} messages", this.topic, this.producerName, Integer.valueOf(this.batchMessageContainer.getNumMessagesInBatch()));
        }
        if (this.batchMessageContainer.isEmpty()) {
            return;
        }
        try {
            List<OpSendMsg> createOpSendMsgs = this.batchMessageContainer.isMultiBatches() ? this.batchMessageContainer.createOpSendMsgs() : Collections.singletonList(this.batchMessageContainer.createOpSendMsg());
            this.batchMessageContainer.clear();
            Iterator<OpSendMsg> it = createOpSendMsgs.iterator();
            while (it.hasNext()) {
                processOpSendMsg(it.next());
            }
        } catch (PulsarClientException e) {
            Thread.currentThread().interrupt();
            this.semaphore.release(this.batchMessageContainer.getNumMessagesInBatch());
        } catch (Throwable th) {
            this.semaphore.release(this.batchMessageContainer.getNumMessagesInBatch());
            log.warn("[{}] [{}] error while create opSendMsg by batch message container", this.topic, this.producerName, th);
        }
    }

    protected void processOpSendMsg(OpSendMsg opSendMsg) {
        if (opSendMsg == null) {
            return;
        }
        try {
            if (opSendMsg.msg != null && isBatchMessagingEnabled()) {
                batchMessageAndSend();
            }
            this.pendingMessages.put(opSendMsg);
            if (opSendMsg.msg != null) {
                LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this, j -> {
                    return Math.max(j, getHighestSequenceId(opSendMsg));
                });
            }
            ClientCnx cnx = cnx();
            if (isConnected()) {
                if (opSendMsg.msg != null && opSendMsg.msg.getSchemaState() == MessageImpl.SchemaState.None) {
                    tryRegisterSchema(cnx, opSendMsg.msg, opSendMsg.callback);
                } else {
                    opSendMsg.cmd.retain();
                    cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, opSendMsg));
                    this.stats.updateNumMsgsSent(opSendMsg.numMessagesInBatch, opSendMsg.batchSizeByte);
                }
            } else if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", this.topic, this.producerName, Long.valueOf(opSendMsg.sequenceId));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            releaseSemaphoreForSendOp(opSendMsg);
            if (opSendMsg != null) {
                opSendMsg.sendComplete(new PulsarClientException(e, opSendMsg.sequenceId));
            }
        } catch (Throwable th) {
            releaseSemaphoreForSendOp(opSendMsg);
            log.warn("[{}] [{}] error while closing out batch -- {}", this.topic, this.producerName, th);
            if (opSendMsg != null) {
                opSendMsg.sendComplete(new PulsarClientException(th, opSendMsg.sequenceId));
            }
        }
    }

    private void recoverProcessOpSendMsgFrom(ClientCnx clientCnx, MessageImpl messageImpl) {
        boolean z = clientCnx.getRemoteEndpointProtocolVersion() < brokerChecksumSupportedVersion();
        Iterator it = this.pendingMessages.iterator();
        OpSendMsg opSendMsg = null;
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            OpSendMsg opSendMsg2 = (OpSendMsg) it.next();
            if (messageImpl != null) {
                if (opSendMsg2.msg == messageImpl) {
                    messageImpl = null;
                } else {
                    continue;
                }
            }
            if (opSendMsg2.msg != null) {
                if (opSendMsg2.msg.getSchemaState() == MessageImpl.SchemaState.None) {
                    if (!rePopulateMessageSchema(opSendMsg2.msg)) {
                        opSendMsg = opSendMsg2;
                        break;
                    }
                } else if (opSendMsg2.msg.getSchemaState() == MessageImpl.SchemaState.Broken) {
                    opSendMsg2.recycle();
                    it.remove();
                }
            }
            if (opSendMsg2.cmd == null) {
                Preconditions.checkState(opSendMsg2.rePopulate != null);
                opSendMsg2.rePopulate.run();
            }
            if (z) {
                stripChecksum(opSendMsg2);
            }
            opSendMsg2.cmd.retain();
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", this.topic, this.producerName, clientCnx.channel(), Long.valueOf(opSendMsg2.sequenceId));
            }
            clientCnx.ctx().write(opSendMsg2.cmd, clientCnx.ctx().voidPromise());
            opSendMsg2.updateSentTimestamp();
            this.stats.updateNumMsgsSent(opSendMsg2.numMessagesInBatch, opSendMsg2.batchSizeByte);
        }
        clientCnx.ctx().flush();
        if (!changeToReadyState()) {
            clientCnx.channel().close();
        } else if (opSendMsg != null) {
            tryRegisterSchema(clientCnx, opSendMsg.msg, opSendMsg.callback);
        }
    }

    public long getDelayInMillis() {
        OpSendMsg peek = this.pendingMessages.peek();
        if (peek != null) {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - peek.createdAt);
        }
        return 0L;
    }

    public String getConnectionId() {
        if (cnx() != null) {
            return this.connectionId;
        }
        return null;
    }

    public String getConnectedSince() {
        if (cnx() != null) {
            return this.connectedSince;
        }
        return null;
    }

    public int getPendingQueueSize() {
        return this.pendingMessages.size();
    }

    @Override // org.apache.pulsar.client.api.Producer
    public ProducerStatsRecorder getStats() {
        return this.stats;
    }

    @Override // org.apache.pulsar.client.api.Producer
    public String getProducerName() {
        return this.producerName;
    }

    ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void resetBackoff() {
        this.connectionHandler.resetBackoff();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void connectionClosed(ClientCnx clientCnx) {
        this.connectionHandler.connectionClosed(clientCnx);
    }

    public ClientCnx getClientCnx() {
        return this.connectionHandler.cnx();
    }

    void setClientCnx(ClientCnx clientCnx) {
        this.connectionHandler.setClientCnx(clientCnx);
    }

    void reconnectLater(Throwable th) {
        this.connectionHandler.reconnectLater(th);
    }

    void grabCnx() {
        this.connectionHandler.grabCnx();
    }

    @VisibleForTesting
    Semaphore getSemaphore() {
        return this.semaphore;
    }
}
