/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.client.impl;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.apache.pulsar.client.api.BatcherBuilder;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.ProducerCryptoFailureAction;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.BackoffBuilder;
import org.apache.pulsar.client.impl.BatchMessageContainerBase;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.ClientCnx;
import org.apache.pulsar.client.impl.ConnectionHandler;
import org.apache.pulsar.client.impl.HandlerState;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.ProducerBase;
import org.apache.pulsar.client.impl.ProducerInterceptors;
import org.apache.pulsar.client.impl.ProducerStatsDisabled;
import org.apache.pulsar.client.impl.ProducerStatsRecorder;
import org.apache.pulsar.client.impl.ProducerStatsRecorderImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.SendCallback;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaHash;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.shade.client.api.v2.Producer;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.Recycler;
import org.apache.pulsar.shade.io.netty.util.ReferenceCountUtil;
import org.apache.pulsar.shade.io.netty.util.Timeout;
import org.apache.pulsar.shade.io.netty.util.TimerTask;
import org.apache.pulsar.shade.io.netty.util.concurrent.ScheduledFuture;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProducerImpl<T>
extends ProducerBase<T>
implements TimerTask,
ConnectionHandler.Connection {
    protected final long producerId;
    private volatile long msgIdGenerator;
    private final Queue<OpSendMsg> pendingMessages;
    private final Optional<Semaphore> semaphore;
    private volatile Timeout sendTimeout = null;
    private final long lookupDeadline;
    private static final AtomicLongFieldUpdater<ProducerImpl> PRODUCER_DEADLINE_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "producerDeadline");
    private volatile long producerDeadline = 0L;
    private final BatchMessageContainerBase batchMessageContainer;
    private CompletableFuture<MessageId> lastSendFuture = CompletableFuture.completedFuture(null);
    private String producerName;
    private boolean userProvidedProducerName = false;
    private String connectionId;
    private String connectedSince;
    private final int partitionIndex;
    private final ProducerStatsRecorder stats;
    private final CompressionCodec compressor;
    static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUBLISHED_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "lastSequenceIdPublished");
    private volatile long lastSequenceIdPublished;
    static final AtomicLongFieldUpdater<ProducerImpl> LAST_SEQ_ID_PUSHED_UPDATER = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "lastSequenceIdPushed");
    protected volatile long lastSequenceIdPushed;
    private volatile boolean isLastSequenceIdPotentialDuplicated;
    private final MessageCrypto msgCrypto;
    private ScheduledFuture<?> keyGeneratorTask = null;
    private final Map<String, String> metadata;
    private Optional<byte[]> schemaVersion = Optional.empty();
    private final ConnectionHandler connectionHandler;
    private ScheduledFuture<?> batchTimerTask;
    private Optional<Long> topicEpoch = Optional.empty();
    private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
    private static final AtomicLongFieldUpdater<ProducerImpl> msgIdGeneratorUpdater = AtomicLongFieldUpdater.newUpdater(ProducerImpl.class, "msgIdGenerator");
    private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);

    public ProducerImpl(PulsarClientImpl client, String topic, ProducerConfigurationData conf, CompletableFuture<Producer<T>> producerCreatedFuture, int partitionIndex, Schema<T> schema, ProducerInterceptors interceptors) {
        super(client, topic, conf, producerCreatedFuture, schema, interceptors);
        this.producerId = client.newProducerId();
        this.producerName = conf.getProducerName();
        if (StringUtils.isNotBlank(this.producerName)) {
            this.userProvidedProducerName = true;
        }
        this.partitionIndex = partitionIndex;
        this.pendingMessages = this.createPendingMessagesQueue();
        this.semaphore = conf.getMaxPendingMessages() > 0 ? Optional.of(new Semaphore(conf.getMaxPendingMessages(), true)) : Optional.empty();
        this.compressor = CompressionCodecProvider.getCompressionCodec(conf.getCompressionType());
        if (conf.getInitialSequenceId() != null) {
            long initialSequenceId;
            this.lastSequenceIdPublished = initialSequenceId = conf.getInitialSequenceId().longValue();
            this.lastSequenceIdPushed = initialSequenceId;
            this.msgIdGenerator = initialSequenceId + 1L;
        } else {
            this.lastSequenceIdPublished = -1L;
            this.lastSequenceIdPushed = -1L;
            this.msgIdGenerator = 0L;
        }
        if (conf.isEncryptionEnabled()) {
            String logCtx = "[" + topic + "] [" + this.producerName + "] [" + this.producerId + "]";
            if (conf.getMessageCrypto() != null) {
                this.msgCrypto = conf.getMessageCrypto();
            } else {
                MessageCryptoBc msgCryptoBc;
                try {
                    msgCryptoBc = new MessageCryptoBc(logCtx, true);
                }
                catch (Exception e) {
                    log.error("MessageCryptoBc may not included in the jar in Producer. e:", (Throwable)e);
                    msgCryptoBc = null;
                }
                this.msgCrypto = msgCryptoBc;
            }
        } else {
            this.msgCrypto = null;
        }
        if (this.msgCrypto != null) {
            this.keyGeneratorTask = client.eventLoopGroup().scheduleWithFixedDelay(() -> {
                block2: {
                    try {
                        this.msgCrypto.addPublicKeyCipher(conf.getEncryptionKeys(), conf.getCryptoKeyReader());
                    }
                    catch (PulsarClientException.CryptoException e) {
                        if (producerCreatedFuture.isDone()) break block2;
                        log.warn("[{}] [{}] [{}] Failed to add public key cipher.", new Object[]{topic, this.producerName, this.producerId});
                        producerCreatedFuture.completeExceptionally(PulsarClientException.wrap(e, String.format("The producer %s of the topic %s adds the public key cipher was failed", this.producerName, topic)));
                    }
                }
            }, 0L, 4L, TimeUnit.HOURS);
        }
        if (conf.getSendTimeoutMs() > 0L) {
            this.sendTimeout = client.timer().newTimeout(this, conf.getSendTimeoutMs(), TimeUnit.MILLISECONDS);
        }
        this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
        if (conf.isBatchingEnabled()) {
            BatcherBuilder containerBuilder = conf.getBatcherBuilder();
            if (containerBuilder == null) {
                containerBuilder = BatcherBuilder.DEFAULT;
            }
            this.batchMessageContainer = (BatchMessageContainerBase)containerBuilder.build();
            this.batchMessageContainer.setProducer(this);
        } else {
            this.batchMessageContainer = null;
        }
        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0L ? new ProducerStatsRecorderImpl(client, conf, this) : ProducerStatsDisabled.INSTANCE;
        this.metadata = conf.getProperties().isEmpty() ? Collections.emptyMap() : Collections.unmodifiableMap(new HashMap<String, String>(conf.getProperties()));
        this.connectionHandler = new ConnectionHandler(this, new BackoffBuilder().setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS).setMandatoryStop(Math.max(100L, conf.getSendTimeoutMs() - 100L), TimeUnit.MILLISECONDS).create(), this);
        this.grabCnx();
    }

    protected Queue<OpSendMsg> createPendingMessagesQueue() {
        return new ArrayDeque<OpSendMsg>();
    }

    public ConnectionHandler getConnectionHandler() {
        return this.connectionHandler;
    }

    private boolean isBatchMessagingEnabled() {
        return this.conf.isBatchingEnabled();
    }

    private boolean isMultiSchemaEnabled(boolean autoEnable) {
        if (this.multiSchemaMode != ProducerBase.MultiSchemaMode.Auto) {
            return this.multiSchemaMode == ProducerBase.MultiSchemaMode.Enabled;
        }
        if (autoEnable) {
            this.multiSchemaMode = ProducerBase.MultiSchemaMode.Enabled;
            return true;
        }
        return false;
    }

    @Override
    public long getLastSequenceId() {
        return this.lastSequenceIdPublished;
    }

    @Override
    CompletableFuture<MessageId> internalSendAsync(Message<?> message) {
        final CompletableFuture<MessageId> future = new CompletableFuture<MessageId>();
        final MessageImpl interceptorMessage = (MessageImpl)this.beforeSend(message);
        interceptorMessage.getDataBuffer().retain();
        if (this.interceptors != null) {
            interceptorMessage.getProperties();
        }
        this.sendAsync(interceptorMessage, new SendCallback(){
            SendCallback nextCallback = null;
            MessageImpl<?> nextMsg = null;
            long createdAt = System.nanoTime();

            @Override
            public CompletableFuture<MessageId> getFuture() {
                return future;
            }

            @Override
            public SendCallback getNextSendCallback() {
                return this.nextCallback;
            }

            @Override
            public MessageImpl<?> getNextMessage() {
                return this.nextMsg;
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void sendComplete(Exception e) {
                try {
                    if (e != null) {
                        ProducerImpl.this.stats.incrementSendFailed();
                        ProducerImpl.this.onSendAcknowledgement(interceptorMessage, null, e);
                        future.completeExceptionally(e);
                    } else {
                        ProducerImpl.this.onSendAcknowledgement(interceptorMessage, interceptorMessage.getMessageId(), null);
                        future.complete(interceptorMessage.getMessageId());
                        ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                    }
                }
                finally {
                    interceptorMessage.getDataBuffer().release();
                }
                while (this.nextCallback != null) {
                    SendCallback sendCallback = this.nextCallback;
                    MessageImpl<?> msg = this.nextMsg;
                    try {
                        msg.getDataBuffer().retain();
                        if (e != null) {
                            ProducerImpl.this.stats.incrementSendFailed();
                            ProducerImpl.this.onSendAcknowledgement(msg, null, e);
                            sendCallback.getFuture().completeExceptionally(e);
                        } else {
                            ProducerImpl.this.onSendAcknowledgement(msg, msg.getMessageId(), null);
                            sendCallback.getFuture().complete(msg.getMessageId());
                            ProducerImpl.this.stats.incrementNumAcksReceived(System.nanoTime() - this.createdAt);
                        }
                        this.nextMsg = this.nextCallback.getNextMessage();
                        this.nextCallback = this.nextCallback.getNextSendCallback();
                    }
                    finally {
                        msg.getDataBuffer().release();
                    }
                }
            }

            @Override
            public void addCallback(MessageImpl<?> msg, SendCallback scb) {
                this.nextMsg = msg;
                this.nextCallback = scb;
            }
        });
        return future;
    }

    @Override
    CompletableFuture<MessageId> internalSendWithTxnAsync(Message<?> message, Transaction txn) {
        if (txn == null) {
            return this.internalSendAsync(message);
        }
        return ((TransactionImpl)txn).registerProducedTopic(this.topic).thenCompose(ignored -> this.internalSendAsync(message));
    }

    private ByteBuf applyCompression(ByteBuf payload) {
        ByteBuf compressedPayload = this.compressor.encode(payload);
        payload.release();
        return compressedPayload;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void sendAsync(Message<?> message, SendCallback callback) {
        Preconditions.checkArgument(message instanceof MessageImpl);
        if (!this.isValidProducerState(callback, message.getSequenceId())) {
            return;
        }
        MessageImpl msg = (MessageImpl)message;
        MessageMetadata msgMetadata = msg.getMessageBuilder();
        ByteBuf payload = msg.getDataBuffer();
        int uncompressedSize = payload.readableBytes();
        if (!this.canEnqueueRequest(callback, message.getSequenceId(), uncompressedSize)) {
            return;
        }
        ByteBuf compressedPayload = payload;
        boolean compressed = false;
        if (!this.isBatchMessagingEnabled() || msgMetadata.hasDeliverAtTime()) {
            compressedPayload = this.applyCompression(payload);
            compressed = true;
            int compressedSize = compressedPayload.readableBytes();
            if (compressedSize > ClientCnx.getMaxMessageSize() && !this.conf.isChunkingEnabled()) {
                compressedPayload.release();
                String compressedStr = !this.isBatchMessagingEnabled() && this.conf.getCompressionType() != CompressionType.NONE ? "Compressed" : "";
                PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException(String.format("The producer %s of the topic %s sends a %s message with %d bytes that exceeds %d bytes", this.producerName, this.topic, compressedStr, compressedSize, ClientCnx.getMaxMessageSize()));
                this.completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
                return;
            }
        }
        if (!msg.isReplicated() && msgMetadata.hasProducerName()) {
            PulsarClientException.InvalidMessageException invalidMessageException = new PulsarClientException.InvalidMessageException(String.format("The producer %s of the topic %s can not reuse the same message", this.producerName, this.topic), msg.getSequenceId());
            this.completeCallbackAndReleaseSemaphore(uncompressedSize, callback, invalidMessageException);
            compressedPayload.release();
            return;
        }
        if (!this.populateMessageSchema(msg, callback)) {
            compressedPayload.release();
            return;
        }
        int totalChunks = this.canAddToBatch(msg) ? 1 : Math.max(1, compressedPayload.readableBytes()) / ClientCnx.getMaxMessageSize() + (Math.max(1, compressedPayload.readableBytes()) % ClientCnx.getMaxMessageSize() == 0 ? 0 : 1);
        for (int i = 0; i < totalChunks - 1; ++i) {
            if (this.canEnqueueRequest(callback, message.getSequenceId(), 0)) continue;
            return;
        }
        try {
            ProducerImpl i = this;
            synchronized (i) {
                long sequenceId;
                int readStartIndex = 0;
                if (!msgMetadata.hasSequenceId()) {
                    sequenceId = msgIdGeneratorUpdater.getAndIncrement(this);
                    msgMetadata.setSequenceId(sequenceId);
                } else {
                    sequenceId = msgMetadata.getSequenceId();
                }
                String uuid = totalChunks > 1 ? String.format("%s-%d", this.producerName, sequenceId) : null;
                for (int chunkId = 0; chunkId < totalChunks; ++chunkId) {
                    this.serializeAndSendMessage(msg, payload, sequenceId, uuid, chunkId, totalChunks, readStartIndex, ClientCnx.getMaxMessageSize(), compressedPayload, compressed, compressedPayload.readableBytes(), uncompressedSize, callback);
                    readStartIndex = (chunkId + 1) * ClientCnx.getMaxMessageSize();
                }
            }
        }
        catch (PulsarClientException e) {
            e.setSequenceId(msg.getSequenceId());
            this.completeCallbackAndReleaseSemaphore(uncompressedSize, callback, e);
        }
        catch (Throwable t) {
            this.completeCallbackAndReleaseSemaphore(uncompressedSize, callback, new PulsarClientException(t, msg.getSequenceId()));
        }
    }

    private void serializeAndSendMessage(MessageImpl<?> msg, ByteBuf payload, long sequenceId, String uuid, int chunkId, int totalChunks, int readStartIndex, int chunkMaxSizeInBytes, ByteBuf compressedPayload, boolean compressed, int compressedPayloadSize, int uncompressedSize, SendCallback callback) throws IOException, InterruptedException {
        ByteBuf chunkPayload = compressedPayload;
        MessageMetadata msgMetadata = msg.getMessageBuilder();
        if (totalChunks > 1 && TopicName.get(this.topic).isPersistent()) {
            chunkPayload = compressedPayload.slice(readStartIndex, Math.min(chunkMaxSizeInBytes, chunkPayload.readableBytes() - readStartIndex));
            if (chunkId != totalChunks - 1) {
                chunkPayload.retain();
            }
            if (uuid != null) {
                msgMetadata.setUuid(uuid);
            }
            msgMetadata.setChunkId(chunkId).setNumChunksFromMsg(totalChunks).setTotalChunkMsgSize(compressedPayloadSize);
        }
        if (!msgMetadata.hasPublishTime()) {
            msgMetadata.setPublishTime(this.client.getClientClock().millis());
            Preconditions.checkArgument(!msgMetadata.hasProducerName());
            msgMetadata.setProducerName(this.producerName);
            if (this.conf.getCompressionType() != CompressionType.NONE) {
                msgMetadata.setCompression(CompressionCodecProvider.convertToWireProtocol(this.conf.getCompressionType()));
            }
            msgMetadata.setUncompressedSize(uncompressedSize);
        }
        if (this.canAddToBatch(msg) && totalChunks <= 1) {
            if (this.canAddToCurrentBatch(msg)) {
                if (sequenceId <= this.lastSequenceIdPushed) {
                    this.isLastSequenceIdPotentialDuplicated = true;
                    if (sequenceId <= this.lastSequenceIdPublished) {
                        log.warn("Message with sequence id {} is definitely a duplicate", (Object)sequenceId);
                    } else {
                        log.info("Message with sequence id {} might be a duplicate but cannot be determined at this time.", (Object)sequenceId);
                    }
                    this.doBatchSendAndAdd(msg, callback, payload);
                } else {
                    if (this.isLastSequenceIdPotentialDuplicated) {
                        this.doBatchSendAndAdd(msg, callback, payload);
                    } else {
                        boolean isBatchFull = this.batchMessageContainer.add(msg, callback);
                        this.lastSendFuture = callback.getFuture();
                        payload.release();
                        if (isBatchFull) {
                            this.batchMessageAndSend();
                        }
                    }
                    this.isLastSequenceIdPotentialDuplicated = false;
                }
            } else {
                this.doBatchSendAndAdd(msg, callback, payload);
            }
        } else {
            OpSendMsg op;
            int numMessages;
            if (!compressed) {
                chunkPayload = this.applyCompression(chunkPayload);
            }
            ByteBuf encryptedPayload = this.encryptMessage(msgMetadata, chunkPayload);
            int n = numMessages = msg.getMessageBuilder().hasNumMessagesInBatch() ? msg.getMessageBuilder().getNumMessagesInBatch() : 1;
            if (msg.getSchemaState() == MessageImpl.SchemaState.Ready) {
                ByteBufPair cmd = this.sendMessage(this.producerId, sequenceId, numMessages, msgMetadata, encryptedPayload);
                op = OpSendMsg.create(msg, cmd, sequenceId, callback);
            } else {
                op = OpSendMsg.create(msg, null, sequenceId, callback);
                MessageMetadata finalMsgMetadata = msgMetadata;
                op.rePopulate = () -> {
                    op.cmd = this.sendMessage(this.producerId, sequenceId, numMessages, finalMsgMetadata, encryptedPayload);
                };
            }
            op.setNumMessagesInBatch(numMessages);
            op.setBatchSizeByte(encryptedPayload.readableBytes());
            if (totalChunks > 1) {
                op.totalChunks = totalChunks;
                op.chunkId = chunkId;
            }
            this.lastSendFuture = callback.getFuture();
            this.processOpSendMsg(op);
        }
    }

    private boolean populateMessageSchema(MessageImpl msg, SendCallback callback) {
        MessageMetadata msgMetadataBuilder = msg.getMessageBuilder();
        if (msg.getSchemaInternal() == this.schema) {
            this.schemaVersion.ifPresent(v -> msgMetadataBuilder.setSchemaVersion((byte[])v));
            msg.setSchemaState(MessageImpl.SchemaState.Ready);
            return true;
        }
        if (!this.isMultiSchemaEnabled(true)) {
            PulsarClientException.InvalidMessageException e = new PulsarClientException.InvalidMessageException(String.format("The producer %s of the topic %s is disabled the `MultiSchema`", this.producerName, this.topic), msg.getSequenceId());
            this.completeCallbackAndReleaseSemaphore(msg.getUncompressedSize(), callback, e);
            return false;
        }
        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
        byte[] schemaVersion = (byte[])this.schemaCache.get(schemaHash);
        if (schemaVersion != null) {
            msgMetadataBuilder.setSchemaVersion(schemaVersion);
            msg.setSchemaState(MessageImpl.SchemaState.Ready);
        }
        return true;
    }

    private boolean rePopulateMessageSchema(MessageImpl msg) {
        SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
        byte[] schemaVersion = (byte[])this.schemaCache.get(schemaHash);
        if (schemaVersion == null) {
            return false;
        }
        msg.getMessageBuilder().setSchemaVersion(schemaVersion);
        msg.setSchemaState(MessageImpl.SchemaState.Ready);
        return true;
    }

    private void tryRegisterSchema(ClientCnx cnx, MessageImpl msg, SendCallback callback) {
        if (!this.changeToRegisteringSchemaState()) {
            return;
        }
        SchemaInfo schemaInfo = msg.hasReplicateFrom() ? msg.getSchemaInfoForReplicator() : msg.getSchemaInfo();
        schemaInfo = Optional.ofNullable(schemaInfo).filter(si -> si.getType().getValue() > 0).orElse(Schema.BYTES.getSchemaInfo());
        this.getOrCreateSchemaAsync(cnx, schemaInfo).handle((v, ex) -> {
            if (ex != null) {
                Throwable t = FutureUtil.unwrapCompletionException(ex);
                log.warn("[{}] [{}] GetOrCreateSchema error", new Object[]{this.topic, this.producerName, t});
                if (t instanceof PulsarClientException.IncompatibleSchemaException) {
                    msg.setSchemaState(MessageImpl.SchemaState.Broken);
                    callback.sendComplete((PulsarClientException.IncompatibleSchemaException)t);
                }
            } else {
                log.warn("[{}] [{}] GetOrCreateSchema succeed", (Object)this.topic, (Object)this.producerName);
                SchemaHash schemaHash = SchemaHash.of(msg.getSchemaInternal());
                this.schemaCache.putIfAbsent(schemaHash, v);
                msg.getMessageBuilder().setSchemaVersion((byte[])v);
                msg.setSchemaState(MessageImpl.SchemaState.Ready);
            }
            cnx.ctx().channel().eventLoop().execute(() -> {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    this.recoverProcessOpSendMsgFrom(cnx, msg);
                }
            });
            return null;
        });
    }

    private CompletableFuture<byte[]> getOrCreateSchemaAsync(ClientCnx cnx, SchemaInfo schemaInfo) {
        if (!Commands.peerSupportsGetOrCreateSchema(cnx.getRemoteEndpointProtocolVersion())) {
            return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException(String.format("The command `GetOrCreateSchema` is not supported for the protocol version %d. The producer is %s, topic is %s", cnx.getRemoteEndpointProtocolVersion(), this.producerName, this.topic)));
        }
        long requestId = this.client.newRequestId();
        ByteBuf request = Commands.newGetOrCreateSchema(requestId, this.topic, schemaInfo);
        log.info("[{}] [{}] GetOrCreateSchema request", (Object)this.topic, (Object)this.producerName);
        return cnx.sendGetOrCreateSchema(request, requestId);
    }

    protected ByteBuf encryptMessage(MessageMetadata msgMetadata, ByteBuf compressedPayload) throws PulsarClientException {
        if (!this.conf.isEncryptionEnabled() || this.msgCrypto == null) {
            return compressedPayload;
        }
        try {
            int maxSize = this.msgCrypto.getMaxOutputSize(compressedPayload.readableBytes());
            ByteBuf encryptedPayload = PulsarByteBufAllocator.DEFAULT.buffer(maxSize);
            ByteBuffer targetBuffer = encryptedPayload.nioBuffer(0, maxSize);
            this.msgCrypto.encrypt(this.conf.getEncryptionKeys(), this.conf.getCryptoKeyReader(), () -> msgMetadata, compressedPayload.nioBuffer(), targetBuffer);
            encryptedPayload.writerIndex(targetBuffer.remaining());
            compressedPayload.release();
            return encryptedPayload;
        }
        catch (PulsarClientException e) {
            if (this.conf.getCryptoFailureAction() == ProducerCryptoFailureAction.SEND) {
                log.warn("[{}] [{}] Failed to encrypt message {}. Proceeding with publishing unencrypted message", new Object[]{this.topic, this.producerName, e.getMessage()});
                return compressedPayload;
            }
            throw e;
        }
    }

    protected ByteBufPair sendMessage(long producerId, long sequenceId, int numMessages, MessageMetadata msgMetadata, ByteBuf compressedPayload) {
        return Commands.newSend(producerId, sequenceId, numMessages, this.getChecksumType(), msgMetadata, compressedPayload);
    }

    protected ByteBufPair sendMessage(long producerId, long lowestSequenceId, long highestSequenceId, int numMessages, MessageMetadata msgMetadata, ByteBuf compressedPayload) {
        return Commands.newSend(producerId, lowestSequenceId, highestSequenceId, numMessages, this.getChecksumType(), msgMetadata, compressedPayload);
    }

    protected Commands.ChecksumType getChecksumType() {
        if (this.connectionHandler.cnx() == null || this.connectionHandler.cnx().getRemoteEndpointProtocolVersion() >= this.brokerChecksumSupportedVersion()) {
            return Commands.ChecksumType.Crc32c;
        }
        return Commands.ChecksumType.None;
    }

    private boolean canAddToBatch(MessageImpl<?> msg) {
        return msg.getSchemaState() == MessageImpl.SchemaState.Ready && this.isBatchMessagingEnabled() && !msg.getMessageBuilder().hasDeliverAtTime();
    }

    private boolean canAddToCurrentBatch(MessageImpl<?> msg) {
        return this.batchMessageContainer.haveEnoughSpace(msg) && (!this.isMultiSchemaEnabled(false) || this.batchMessageContainer.hasSameSchema(msg)) && this.batchMessageContainer.hasSameTxn(msg);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void doBatchSendAndAdd(MessageImpl<?> msg, SendCallback callback, ByteBuf payload) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Closing out batch to accommodate large message with size {}", new Object[]{this.topic, this.producerName, msg.getUncompressedSize()});
        }
        try {
            this.batchMessageAndSend();
            this.batchMessageContainer.add(msg, callback);
            this.lastSendFuture = callback.getFuture();
        }
        finally {
            payload.release();
        }
    }

    private boolean isValidProducerState(SendCallback callback, long sequenceId) {
        switch (this.getState()) {
            case Ready: 
            case Connecting: 
            case RegisteringSchema: {
                return true;
            }
            case Closing: 
            case Closed: {
                callback.sendComplete(new PulsarClientException.AlreadyClosedException("Producer already closed", sequenceId));
                return false;
            }
            case ProducerFenced: {
                callback.sendComplete(new PulsarClientException.ProducerFencedException("Producer was fenced"));
                return false;
            }
            case Terminated: {
                callback.sendComplete(new PulsarClientException.TopicTerminatedException("Topic was terminated", sequenceId));
                return false;
            }
        }
        callback.sendComplete(new PulsarClientException.NotConnectedException(sequenceId));
        return false;
    }

    private boolean canEnqueueRequest(SendCallback callback, long sequenceId, int payloadSize) {
        try {
            if (this.conf.isBlockIfQueueFull()) {
                if (this.semaphore.isPresent()) {
                    this.semaphore.get().acquire();
                }
                this.client.getMemoryLimitController().reserveMemory(payloadSize);
            } else {
                if (!this.semaphore.map(Semaphore::tryAcquire).orElse(true).booleanValue()) {
                    callback.sendComplete(new PulsarClientException.ProducerQueueIsFullError("Producer send queue is full", sequenceId));
                    return false;
                }
                if (!this.client.getMemoryLimitController().tryReserveMemory(payloadSize)) {
                    this.semaphore.ifPresent(Semaphore::release);
                    callback.sendComplete(new PulsarClientException.MemoryBufferIsFullError("Client memory buffer is full", sequenceId));
                    return false;
                }
            }
        }
        catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            callback.sendComplete(new PulsarClientException(e, sequenceId));
            return false;
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> closeAsync() {
        ScheduledFuture<?> batchTimerTask;
        HandlerState.State currentState = this.getAndUpdateState(state -> {
            if (state == HandlerState.State.Closed) {
                return state;
            }
            return HandlerState.State.Closing;
        });
        if (currentState == HandlerState.State.Closed || currentState == HandlerState.State.Closing) {
            return CompletableFuture.completedFuture(null);
        }
        Timeout timeout = this.sendTimeout;
        if (timeout != null) {
            timeout.cancel();
            this.sendTimeout = null;
        }
        if ((batchTimerTask = this.batchTimerTask) != null) {
            batchTimerTask.cancel(false);
            this.batchTimerTask = null;
        }
        if (this.keyGeneratorTask != null && !this.keyGeneratorTask.isCancelled()) {
            this.keyGeneratorTask.cancel(false);
        }
        this.stats.cancelStatsTimeout();
        ClientCnx cnx = this.cnx();
        if (cnx == null || currentState != HandlerState.State.Ready) {
            log.info("[{}] [{}] Closed Producer (not connected)", (Object)this.topic, (Object)this.producerName);
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                this.setState(HandlerState.State.Closed);
                this.client.cleanupProducer(this);
                PulsarClientException.AlreadyClosedException ex = new PulsarClientException.AlreadyClosedException(String.format("The producer %s of the topic %s was already closed when closing the producers", this.producerName, this.topic));
                this.pendingMessages.forEach(msg -> {
                    msg.sendComplete(ex);
                    msg.cmd.release();
                    msg.recycle();
                });
                this.pendingMessages.clear();
            }
            return CompletableFuture.completedFuture(null);
        }
        long requestId = this.client.newRequestId();
        ByteBuf cmd = Commands.newCloseProducer(this.producerId, requestId);
        CompletableFuture<Void> closeFuture = new CompletableFuture<Void>();
        cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
            cnx.removeProducer(this.producerId);
            if (exception == null || !cnx.ctx().channel().isActive()) {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    log.info("[{}] [{}] Closed Producer", (Object)this.topic, (Object)this.producerName);
                    this.setState(HandlerState.State.Closed);
                    this.pendingMessages.forEach(msg -> {
                        msg.cmd.release();
                        msg.recycle();
                    });
                    this.pendingMessages.clear();
                }
                closeFuture.complete(null);
                this.client.cleanupProducer(this);
            } else {
                closeFuture.completeExceptionally((Throwable)exception);
            }
            return null;
        });
        return closeFuture;
    }

    @Override
    public boolean isConnected() {
        return this.connectionHandler.cnx() != null && this.getState() == HandlerState.State.Ready;
    }

    @Override
    public long getLastDisconnectedTimestamp() {
        return this.connectionHandler.lastConnectionClosedTimestamp;
    }

    public boolean isWritable() {
        ClientCnx cnx = this.connectionHandler.cnx();
        return cnx != null && cnx.channel().isWritable();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void terminated(ClientCnx cnx) {
        HandlerState.State previousState = this.getAndUpdateState(state -> state == HandlerState.State.Closed ? HandlerState.State.Closed : HandlerState.State.Terminated);
        if (previousState != HandlerState.State.Terminated && previousState != HandlerState.State.Closed) {
            log.info("[{}] [{}] The topic has been terminated", (Object)this.topic, (Object)this.producerName);
            this.setClientCnx(null);
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                this.failPendingMessages(cnx, new PulsarClientException.TopicTerminatedException(String.format("The topic %s that the producer %s produces to has been terminated", this.topic, this.producerName)));
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void ackReceived(ClientCnx cnx, long sequenceId, long highestSequenceId, long ledgerId, long entryId) {
        OpSendMsg op = null;
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            op = this.pendingMessages.peek();
            if (op == null) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg {} - {}", new Object[]{this.topic, this.producerName, sequenceId, highestSequenceId});
                }
                return;
            }
            if (sequenceId > op.sequenceId) {
                log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}", new Object[]{this.topic, this.producerName, op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, this.pendingMessages.size()});
                cnx.channel().close();
                return;
            }
            if (sequenceId < op.sequenceId) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Got ack for timed out msg. expecting: {} - {} - got: {} - {}", new Object[]{this.topic, this.producerName, op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId});
                }
                return;
            }
            if (sequenceId >= highestSequenceId || highestSequenceId == op.highestSequenceId) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Received ack for msg {} ", new Object[]{this.topic, this.producerName, sequenceId});
                }
            } else {
                log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}", new Object[]{this.topic, this.producerName, op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, this.pendingMessages.size()});
                cnx.channel().close();
                return;
            }
            this.pendingMessages.remove();
            this.releaseSemaphoreForSendOp(op);
        }
        OpSendMsg finalOp = op;
        LAST_SEQ_ID_PUBLISHED_UPDATER.getAndUpdate(this, last -> Math.max(last, this.getHighestSequenceId(finalOp)));
        op.setMessageId(ledgerId, entryId, this.partitionIndex);
        if (op.totalChunks <= 1 || op.chunkId == op.totalChunks - 1) {
            try {
                op.sendComplete(null);
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, sequenceId, t});
            }
        }
        ReferenceCountUtil.safeRelease(op.cmd);
        op.recycle();
    }

    private long getHighestSequenceId(OpSendMsg op) {
        return Math.max(op.highestSequenceId, op.sequenceId);
    }

    private void releaseSemaphoreForSendOp(OpSendMsg op) {
        if (this.semaphore.isPresent()) {
            this.semaphore.get().release(this.isBatchMessagingEnabled() ? op.numMessagesInBatch : 1);
        }
        this.client.getMemoryLimitController().releaseMemory(op.uncompressedSize);
    }

    private void completeCallbackAndReleaseSemaphore(long payloadSize, SendCallback callback, Exception exception) {
        this.semaphore.ifPresent(Semaphore::release);
        this.client.getMemoryLimitController().releaseMemory(payloadSize);
        callback.sendComplete(exception);
    }

    protected synchronized void recoverChecksumError(ClientCnx cnx, long sequenceId) {
        OpSendMsg op = this.pendingMessages.peek();
        if (op == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Got send failure for timed out msg {}", new Object[]{this.topic, this.producerName, sequenceId});
            }
        } else {
            long expectedSequenceId = this.getHighestSequenceId(op);
            if (sequenceId == expectedSequenceId) {
                boolean corrupted;
                boolean bl = corrupted = !this.verifyLocalBufferIsNotCorrupted(op);
                if (corrupted) {
                    this.pendingMessages.remove();
                    this.releaseSemaphoreForSendOp(op);
                    try {
                        op.sendComplete(new PulsarClientException.ChecksumException(String.format("The checksum of the message which is produced by producer %s to the topic %s is corrupted", this.producerName, this.topic)));
                    }
                    catch (Throwable t) {
                        log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, sequenceId, t});
                    }
                    ReferenceCountUtil.safeRelease(op.cmd);
                    op.recycle();
                    return;
                }
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Message is not corrupted, retry send-message with sequenceId {}", new Object[]{this.topic, this.producerName, sequenceId});
                }
            } else if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Corrupt message is already timed out {}", new Object[]{this.topic, this.producerName, sequenceId});
            }
        }
        this.resendMessages(cnx);
    }

    protected synchronized void recoverNotAllowedError(long sequenceId) {
        OpSendMsg op = this.pendingMessages.peek();
        if (op != null && sequenceId == this.getHighestSequenceId(op)) {
            this.pendingMessages.remove();
            this.releaseSemaphoreForSendOp(op);
            try {
                op.sendComplete(new PulsarClientException.NotAllowedException(String.format("The size of the message which is produced by producer %s to the topic %s is not allowed", this.producerName, this.topic)));
            }
            catch (Throwable t) {
                log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, sequenceId, t});
            }
            ReferenceCountUtil.safeRelease(op.cmd);
            op.recycle();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean verifyLocalBufferIsNotCorrupted(OpSendMsg op) {
        ByteBufPair msg = op.cmd;
        if (msg != null) {
            ByteBuf headerFrame = msg.getFirst();
            headerFrame.markReaderIndex();
            try {
                headerFrame.skipBytes(4);
                int cmdSize = (int)headerFrame.readUnsignedInt();
                headerFrame.skipBytes(cmdSize);
                if (Commands.hasChecksum(headerFrame)) {
                    int metadataChecksum;
                    long computedChecksum;
                    int checksum = Commands.readChecksum(headerFrame);
                    boolean bl = (long)checksum == (computedChecksum = (long)Crc32cIntChecksum.resumeChecksum(metadataChecksum = Crc32cIntChecksum.computeChecksum(headerFrame), msg.getSecond()));
                    return bl;
                }
                log.warn("[{}] [{}] checksum is not present into message with id {}", new Object[]{this.topic, this.producerName, op.sequenceId});
            }
            finally {
                headerFrame.resetReaderIndex();
            }
            return true;
        }
        log.warn("[{}] Failed while casting empty ByteBufPair, ", (Object)this.producerName);
        return false;
    }

    @Override
    public void connectionOpened(ClientCnx cnx) {
        this.previousExceptions.clear();
        this.connectionHandler.setClientCnx(cnx);
        cnx.registerProducer(this.producerId, this);
        log.info("[{}] [{}] Creating producer on cnx {}", new Object[]{this.topic, this.producerName, cnx.ctx().channel()});
        long requestId = this.client.newRequestId();
        PRODUCER_DEADLINE_UPDATER.compareAndSet(this, 0L, System.currentTimeMillis() + this.client.getConfiguration().getOperationTimeoutMs());
        SchemaInfo schemaInfo = null;
        if (this.schema != null && this.schema.getSchemaInfo() != null) {
            if (this.schema.getSchemaInfo().getType() == SchemaType.JSON) {
                if (Commands.peerSupportJsonSchemaAvroFormat(cnx.getRemoteEndpointProtocolVersion())) {
                    schemaInfo = this.schema.getSchemaInfo();
                } else if (this.schema instanceof JSONSchema) {
                    JSONSchema jsonSchema = (JSONSchema)this.schema;
                    schemaInfo = jsonSchema.getBackwardsCompatibleJsonSchemaInfo();
                } else {
                    schemaInfo = this.schema.getSchemaInfo();
                }
            } else {
                schemaInfo = this.schema.getSchemaInfo().getType() == SchemaType.BYTES || this.schema.getSchemaInfo().getType() == SchemaType.NONE ? null : this.schema.getSchemaInfo();
            }
        }
        ((CompletableFuture)cnx.sendRequestWithId(Commands.newProducer(this.topic, this.producerId, requestId, this.producerName, this.conf.isEncryptionEnabled(), this.metadata, schemaInfo, this.connectionHandler.getEpoch(), this.userProvidedProducerName, this.conf.getAccessMode(), this.topicEpoch), requestId).thenAccept(response -> {
            String producerName = response.getProducerName();
            long lastSequenceId = response.getLastSequenceId();
            this.schemaVersion = Optional.ofNullable(response.getSchemaVersion());
            this.schemaVersion.ifPresent(v -> this.schemaCache.put(SchemaHash.of(this.schema), v));
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                    cnx.removeProducer(this.producerId);
                    cnx.channel().close();
                    return;
                }
                this.resetBackoff();
                log.info("[{}] [{}] Created producer on cnx {}", new Object[]{this.topic, producerName, cnx.ctx().channel()});
                this.connectionId = cnx.ctx().channel().toString();
                this.connectedSince = DateFormatter.now();
                if (this.conf.getAccessMode() != ProducerAccessMode.Shared && !this.topicEpoch.isPresent()) {
                    log.info("[{}] [{}] Producer epoch is {}", new Object[]{this.topic, producerName, response.getTopicEpoch()});
                }
                this.topicEpoch = response.getTopicEpoch();
                if (this.producerName == null) {
                    this.producerName = producerName;
                }
                if (this.msgIdGenerator == 0L && this.conf.getInitialSequenceId() == null) {
                    this.lastSequenceIdPublished = lastSequenceId;
                    this.msgIdGenerator = lastSequenceId + 1L;
                }
                if (!this.producerCreatedFuture.isDone() && this.isBatchMessagingEnabled()) {
                    this.batchTimerTask = cnx.ctx().executor().scheduleAtFixedRate(() -> {
                        if (log.isTraceEnabled()) {
                            log.trace("[{}] [{}] Batching the messages from the batch container from timer thread", (Object)this.topic, (Object)producerName);
                        }
                        ProducerImpl producerImpl = this;
                        synchronized (producerImpl) {
                            if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                                return;
                            }
                            this.batchMessageAndSend();
                        }
                    }, 0L, this.conf.getBatchingMaxPublishDelayMicros(), TimeUnit.MICROSECONDS);
                }
                this.resendMessages(cnx);
            }
        })).exceptionally(e -> {
            ProducerImpl producerImpl;
            Throwable cause = e.getCause();
            cnx.removeProducer(this.producerId);
            if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                cnx.channel().close();
                return null;
            }
            log.error("[{}] [{}] Failed to create producer: {}", new Object[]{this.topic, this.producerName, cause.getMessage()});
            if (cause instanceof PulsarClientException.TopicDoesNotExistException) {
                this.closeAsync().whenComplete((v, ex) -> {
                    if (ex != null) {
                        log.error("Failed to close producer on TopicDoesNotExistException.", ex);
                    }
                    this.producerCreatedFuture.completeExceptionally(cause);
                });
                return null;
            }
            if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededException) {
                producerImpl = this;
                synchronized (producerImpl) {
                    log.warn("[{}] [{}] Topic backlog quota exceeded. Throwing Exception on producer.", (Object)this.topic, (Object)this.producerName);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Pending messages: {}", new Object[]{this.topic, this.producerName, this.pendingMessages.size()});
                    }
                    PulsarClientException.ProducerBlockedQuotaExceededException bqe = new PulsarClientException.ProducerBlockedQuotaExceededException(String.format("The backlog quota of the topic %s that the producer %s produces to is exceeded", this.topic, this.producerName));
                    this.failPendingMessages(this.cnx(), bqe);
                }
            } else if (cause instanceof PulsarClientException.ProducerBlockedQuotaExceededError) {
                log.warn("[{}] [{}] Producer is blocked on creation because backlog exceeded on topic.", (Object)this.producerName, (Object)this.topic);
            }
            if (cause instanceof PulsarClientException.TopicTerminatedException) {
                this.setState(HandlerState.State.Terminated);
                producerImpl = this;
                synchronized (producerImpl) {
                    this.failPendingMessages(this.cnx(), (PulsarClientException)cause);
                }
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
            } else if (cause instanceof PulsarClientException.ProducerFencedException) {
                this.setState(HandlerState.State.ProducerFenced);
                producerImpl = this;
                synchronized (producerImpl) {
                    this.failPendingMessages(this.cnx(), (PulsarClientException)cause);
                }
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
            } else if (this.producerCreatedFuture.isDone() || cause instanceof PulsarClientException && PulsarClientException.isRetriableError(cause) && System.currentTimeMillis() < PRODUCER_DEADLINE_UPDATER.get(this)) {
                this.reconnectLater(cause);
            } else {
                this.setState(HandlerState.State.Failed);
                this.producerCreatedFuture.completeExceptionally(cause);
                this.client.cleanupProducer(this);
                Timeout timeout = this.sendTimeout;
                if (timeout != null) {
                    timeout.cancel();
                    this.sendTimeout = null;
                }
            }
            return null;
        });
    }

    @Override
    public void connectionFailed(PulsarClientException exception) {
        boolean timeout;
        boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
        boolean bl = timeout = System.currentTimeMillis() > this.lookupDeadline;
        if (nonRetriableError || timeout) {
            exception.setPreviousExceptions(this.previousExceptions);
            if (this.producerCreatedFuture.completeExceptionally(exception)) {
                if (nonRetriableError) {
                    log.info("[{}] Producer creation failed for producer {} with unretriableError = {}", new Object[]{this.topic, this.producerId, exception});
                } else {
                    log.info("[{}] Producer creation failed for producer {} after producerTimeout", (Object)this.topic, (Object)this.producerId);
                }
                this.setState(HandlerState.State.Failed);
                this.client.cleanupProducer(this);
            }
        } else {
            this.previousExceptions.add(exception);
        }
    }

    private void resendMessages(ClientCnx cnx) {
        cnx.ctx().channel().eventLoop().execute(() -> {
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                    cnx.channel().close();
                    return;
                }
                int messagesToResend = this.pendingMessages.size();
                if (messagesToResend == 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] No pending messages to resend {}", new Object[]{this.topic, this.producerName, messagesToResend});
                    }
                    if (this.changeToReadyState()) {
                        this.producerCreatedFuture.complete(this);
                        return;
                    }
                    cnx.channel().close();
                    return;
                }
                log.info("[{}] [{}] Re-Sending {} messages to server", new Object[]{this.topic, this.producerName, messagesToResend});
                this.recoverProcessOpSendMsgFrom(cnx, null);
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void stripChecksum(OpSendMsg op) {
        ByteBufPair msg = op.cmd;
        if (msg != null) {
            int totalMsgBufSize = msg.readableBytes();
            ByteBuf headerFrame = msg.getFirst();
            headerFrame.markReaderIndex();
            try {
                headerFrame.skipBytes(4);
                int cmdSize = (int)headerFrame.readUnsignedInt();
                headerFrame.skipBytes(cmdSize);
                if (!Commands.hasChecksum(headerFrame)) {
                    return;
                }
                int headerSize = 8 + cmdSize;
                int checksumSize = 6;
                int checksumMark = headerSize + checksumSize;
                int metaPayloadSize = totalMsgBufSize - checksumMark;
                int newTotalFrameSizeLength = 4 + cmdSize + metaPayloadSize;
                headerFrame.resetReaderIndex();
                int headerFrameSize = headerFrame.readableBytes();
                headerFrame.setInt(0, newTotalFrameSizeLength);
                ByteBuf metadata = headerFrame.slice(checksumMark, headerFrameSize - checksumMark);
                headerFrame.writerIndex(headerSize);
                metadata.readBytes(headerFrame, metadata.readableBytes());
                headerFrame.capacity(headerFrameSize - checksumSize);
            }
            finally {
                headerFrame.resetReaderIndex();
            }
        } else {
            log.warn("[{}] Failed while casting null into ByteBufPair", (Object)this.producerName);
        }
    }

    public int brokerChecksumSupportedVersion() {
        return ProtocolVersion.v6.getValue();
    }

    @Override
    String getHandlerName() {
        return this.producerName;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run(Timeout timeout) throws Exception {
        if (timeout.isCancelled()) {
            return;
        }
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            long timeToWaitMs;
            if (this.getState() == HandlerState.State.Closing || this.getState() == HandlerState.State.Closed) {
                return;
            }
            OpSendMsg firstMsg = this.pendingMessages.peek();
            if (firstMsg == null) {
                timeToWaitMs = this.conf.getSendTimeoutMs();
            } else {
                long diff = this.conf.getSendTimeoutMs() - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstMsg.createdAt);
                if (diff <= 0L) {
                    log.info("[{}] [{}] Message send timed out. Failing {} messages", new Object[]{this.topic, this.producerName, this.pendingMessages.size()});
                    PulsarClientException.TimeoutException te = new PulsarClientException.TimeoutException(String.format("The producer %s can not send message to the topic %s within given timeout", this.producerName, this.topic), firstMsg.sequenceId);
                    this.failPendingMessages(this.cnx(), te);
                    this.stats.incrementSendFailed(this.pendingMessages.size());
                    timeToWaitMs = this.conf.getSendTimeoutMs();
                } else {
                    timeToWaitMs = diff;
                }
            }
            this.sendTimeout = this.client.timer().newTimeout(this, timeToWaitMs, TimeUnit.MILLISECONDS);
        }
    }

    private void failPendingMessages(ClientCnx cnx, PulsarClientException ex) {
        if (cnx == null) {
            AtomicInteger releaseCount = new AtomicInteger();
            boolean batchMessagingEnabled = this.isBatchMessagingEnabled();
            this.pendingMessages.forEach(op -> {
                releaseCount.addAndGet(batchMessagingEnabled ? op.numMessagesInBatch : 1);
                try {
                    ex.setSequenceId(op.sequenceId);
                    if (op.totalChunks <= 1 || op.chunkId == op.totalChunks - 1) {
                        op.sendComplete(ex);
                    }
                }
                catch (Throwable t) {
                    log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", new Object[]{this.topic, this.producerName, op.sequenceId, t});
                }
                ReferenceCountUtil.safeRelease(op.cmd);
                op.recycle();
            });
            this.pendingMessages.clear();
            this.semaphore.ifPresent(s2 -> s2.release(releaseCount.get()));
            if (batchMessagingEnabled) {
                this.failPendingBatchMessages(ex);
            }
        } else {
            cnx.ctx().channel().eventLoop().execute(() -> {
                ProducerImpl producerImpl = this;
                synchronized (producerImpl) {
                    this.failPendingMessages(null, ex);
                }
            });
        }
    }

    private void failPendingBatchMessages(PulsarClientException ex) {
        if (this.batchMessageContainer.isEmpty()) {
            return;
        }
        int numMessagesInBatch = this.batchMessageContainer.getNumMessagesInBatch();
        this.batchMessageContainer.discard(ex);
        this.semaphore.ifPresent(s2 -> s2.release(numMessagesInBatch));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public CompletableFuture<Void> flushAsync() {
        CompletableFuture<MessageId> lastSendFuture;
        ProducerImpl producerImpl = this;
        synchronized (producerImpl) {
            if (this.isBatchMessagingEnabled()) {
                this.batchMessageAndSend();
            }
            lastSendFuture = this.lastSendFuture;
        }
        return lastSendFuture.thenApply(ignored -> null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected void triggerFlush() {
        if (this.isBatchMessagingEnabled()) {
            ProducerImpl producerImpl = this;
            synchronized (producerImpl) {
                this.batchMessageAndSend();
            }
        }
    }

    private void batchMessageAndSend() {
        if (log.isTraceEnabled()) {
            log.trace("[{}] [{}] Batching the messages from the batch container with {} messages", new Object[]{this.topic, this.producerName, this.batchMessageContainer.getNumMessagesInBatch()});
        }
        if (!this.batchMessageContainer.isEmpty()) {
            try {
                List<OpSendMsg> opSendMsgs = this.batchMessageContainer.isMultiBatches() ? this.batchMessageContainer.createOpSendMsgs() : Collections.singletonList(this.batchMessageContainer.createOpSendMsg());
                this.batchMessageContainer.clear();
                for (OpSendMsg opSendMsg : opSendMsgs) {
                    this.processOpSendMsg(opSendMsg);
                }
            }
            catch (PulsarClientException e) {
                Thread.currentThread().interrupt();
                this.semaphore.ifPresent(s2 -> s2.release(this.batchMessageContainer.getNumMessagesInBatch()));
            }
            catch (Throwable t) {
                this.semaphore.ifPresent(s2 -> s2.release(this.batchMessageContainer.getNumMessagesInBatch()));
                log.warn("[{}] [{}] error while create opSendMsg by batch message container", new Object[]{this.topic, this.producerName, t});
            }
        }
    }

    protected void processOpSendMsg(OpSendMsg op) {
        if (op == null) {
            return;
        }
        try {
            if (op.msg != null && this.isBatchMessagingEnabled()) {
                this.batchMessageAndSend();
            }
            this.pendingMessages.add(op);
            if (op.msg != null) {
                LAST_SEQ_ID_PUSHED_UPDATER.getAndUpdate(this, last -> Math.max(last, this.getHighestSequenceId(op)));
            }
            if (this.shouldWriteOpSendMsg()) {
                ClientCnx cnx = this.cnx();
                if (op.msg != null && op.msg.getSchemaState() == MessageImpl.SchemaState.None) {
                    this.tryRegisterSchema(cnx, op.msg, op.callback);
                    return;
                }
                op.cmd.retain();
                cnx.ctx().channel().eventLoop().execute(WriteInEventLoopCallback.create(this, cnx, op));
                this.stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
            } else if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Connection is not ready -- sequenceId {}", new Object[]{this.topic, this.producerName, op.sequenceId});
            }
        }
        catch (Throwable t) {
            this.releaseSemaphoreForSendOp(op);
            log.warn("[{}] [{}] error while closing out batch -- {}", new Object[]{this.topic, this.producerName, t});
            op.sendComplete(new PulsarClientException(t, op.sequenceId));
        }
    }

    protected boolean shouldWriteOpSendMsg() {
        return this.isConnected();
    }

    private void recoverProcessOpSendMsgFrom(ClientCnx cnx, MessageImpl from) {
        boolean stripChecksum = cnx.getRemoteEndpointProtocolVersion() < this.brokerChecksumSupportedVersion();
        Iterator msgIterator = this.pendingMessages.iterator();
        OpSendMsg pendingRegisteringOp = null;
        while (msgIterator.hasNext()) {
            OpSendMsg op = (OpSendMsg)msgIterator.next();
            if (from != null) {
                if (op.msg != from) continue;
                from = null;
            }
            if (op.msg != null) {
                if (op.msg.getSchemaState() == MessageImpl.SchemaState.None) {
                    if (!this.rePopulateMessageSchema(op.msg)) {
                        pendingRegisteringOp = op;
                        break;
                    }
                } else if (op.msg.getSchemaState() == MessageImpl.SchemaState.Broken) {
                    op.recycle();
                    msgIterator.remove();
                    continue;
                }
            }
            if (op.cmd == null) {
                Preconditions.checkState(op.rePopulate != null);
                op.rePopulate.run();
            }
            if (stripChecksum) {
                this.stripChecksum(op);
            }
            op.cmd.retain();
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Re-Sending message in cnx {}, sequenceId {}", new Object[]{this.topic, this.producerName, cnx.channel(), op.sequenceId});
            }
            cnx.ctx().write(op.cmd, cnx.ctx().voidPromise());
            op.updateSentTimestamp();
            this.stats.updateNumMsgsSent(op.numMessagesInBatch, op.batchSizeByte);
        }
        cnx.ctx().flush();
        if (!this.changeToReadyState()) {
            cnx.channel().close();
            return;
        }
        if (pendingRegisteringOp != null) {
            this.tryRegisterSchema(cnx, pendingRegisteringOp.msg, pendingRegisteringOp.callback);
        }
    }

    public long getDelayInMillis() {
        OpSendMsg firstMsg = this.pendingMessages.peek();
        if (firstMsg != null) {
            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - firstMsg.createdAt);
        }
        return 0L;
    }

    public String getConnectionId() {
        return this.cnx() != null ? this.connectionId : null;
    }

    public String getConnectedSince() {
        return this.cnx() != null ? this.connectedSince : null;
    }

    public int getPendingQueueSize() {
        return this.pendingMessages.size();
    }

    @Override
    public ProducerStatsRecorder getStats() {
        return this.stats;
    }

    @Override
    public String getProducerName() {
        return this.producerName;
    }

    ClientCnx cnx() {
        return this.connectionHandler.cnx();
    }

    void resetBackoff() {
        this.connectionHandler.resetBackoff();
    }

    void connectionClosed(ClientCnx cnx) {
        this.connectionHandler.connectionClosed(cnx);
    }

    public ClientCnx getClientCnx() {
        return this.connectionHandler.cnx();
    }

    void setClientCnx(ClientCnx clientCnx) {
        this.connectionHandler.setClientCnx(clientCnx);
    }

    void reconnectLater(Throwable exception) {
        this.connectionHandler.reconnectLater(exception);
    }

    void grabCnx() {
        this.connectionHandler.grabCnx();
    }

    @VisibleForTesting
    Optional<Semaphore> getSemaphore() {
        return this.semaphore;
    }

    protected static final class OpSendMsg {
        MessageImpl<?> msg;
        List<MessageImpl<?>> msgs;
        ByteBufPair cmd;
        SendCallback callback;
        Runnable rePopulate;
        long uncompressedSize;
        long sequenceId;
        long createdAt;
        long firstSentAt;
        long lastSentAt;
        int retryCount;
        long batchSizeByte = 0L;
        int numMessagesInBatch = 1;
        long highestSequenceId;
        int totalChunks = 0;
        int chunkId = -1;
        private final Recycler.Handle<OpSendMsg> recyclerHandle;
        private static final Recycler<OpSendMsg> RECYCLER = new Recycler<OpSendMsg>(){

            @Override
            protected OpSendMsg newObject(Recycler.Handle<OpSendMsg> handle) {
                return new OpSendMsg(handle);
            }
        };

        static OpSendMsg create(MessageImpl<?> msg, ByteBufPair cmd, long sequenceId, SendCallback callback) {
            OpSendMsg op = RECYCLER.get();
            op.msg = msg;
            op.cmd = cmd;
            op.callback = callback;
            op.sequenceId = sequenceId;
            op.createdAt = System.nanoTime();
            op.uncompressedSize = msg.getUncompressedSize();
            return op;
        }

        static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long sequenceId, SendCallback callback) {
            OpSendMsg op = RECYCLER.get();
            op.msgs = msgs;
            op.cmd = cmd;
            op.callback = callback;
            op.sequenceId = sequenceId;
            op.createdAt = System.nanoTime();
            op.uncompressedSize = 0L;
            for (int i = 0; i < msgs.size(); ++i) {
                op.uncompressedSize += (long)msgs.get(i).getUncompressedSize();
            }
            return op;
        }

        static OpSendMsg create(List<MessageImpl<?>> msgs, ByteBufPair cmd, long lowestSequenceId, long highestSequenceId, SendCallback callback) {
            OpSendMsg op = RECYCLER.get();
            op.msgs = msgs;
            op.cmd = cmd;
            op.callback = callback;
            op.sequenceId = lowestSequenceId;
            op.highestSequenceId = highestSequenceId;
            op.createdAt = System.nanoTime();
            op.uncompressedSize = 0L;
            for (int i = 0; i < msgs.size(); ++i) {
                op.uncompressedSize += (long)msgs.get(i).getUncompressedSize();
            }
            return op;
        }

        void updateSentTimestamp() {
            this.lastSentAt = System.nanoTime();
            if (this.firstSentAt == -1L) {
                this.firstSentAt = this.lastSentAt;
            }
            ++this.retryCount;
        }

        void sendComplete(Exception e) {
            SendCallback callback = this.callback;
            if (null != callback) {
                Exception finalEx = e;
                if (finalEx != null && finalEx instanceof PulsarClientException.TimeoutException) {
                    PulsarClientException.TimeoutException te = (PulsarClientException.TimeoutException)e;
                    long sequenceId = te.getSequenceId();
                    long ns = System.nanoTime();
                    String errMsg = String.format("%s : createdAt %s ns ago, firstSentAt %s ns ago, lastSentAt %s ns ago, retryCount %s", te.getMessage(), ns - this.createdAt, ns - this.firstSentAt, ns - this.lastSentAt, this.retryCount);
                    finalEx = new PulsarClientException.TimeoutException(errMsg, sequenceId);
                }
                callback.sendComplete(finalEx);
            }
        }

        void recycle() {
            this.msg = null;
            this.msgs = null;
            this.cmd = null;
            this.callback = null;
            this.rePopulate = null;
            this.sequenceId = -1L;
            this.createdAt = -1L;
            this.firstSentAt = -1L;
            this.lastSentAt = -1L;
            this.highestSequenceId = -1L;
            this.totalChunks = 0;
            this.chunkId = -1;
            this.uncompressedSize = 0L;
            this.retryCount = 0;
            this.batchSizeByte = 0L;
            this.numMessagesInBatch = 1;
            this.recyclerHandle.recycle(this);
        }

        void setNumMessagesInBatch(int numMessagesInBatch) {
            this.numMessagesInBatch = numMessagesInBatch;
        }

        void setBatchSizeByte(long batchSizeByte) {
            this.batchSizeByte = batchSizeByte;
        }

        void setMessageId(long ledgerId, long entryId, int partitionIndex) {
            if (this.msg != null) {
                this.msg.setMessageId(new MessageIdImpl(ledgerId, entryId, partitionIndex));
            } else {
                for (int batchIndex = 0; batchIndex < this.msgs.size(); ++batchIndex) {
                    this.msgs.get(batchIndex).setMessageId(new BatchMessageIdImpl(ledgerId, entryId, partitionIndex, batchIndex));
                }
            }
        }

        private OpSendMsg(Recycler.Handle<OpSendMsg> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }

    private static final class WriteInEventLoopCallback
    implements Runnable {
        private ProducerImpl<?> producer;
        private ByteBufPair cmd;
        private long sequenceId;
        private ClientCnx cnx;
        private OpSendMsg op;
        private final Recycler.Handle<WriteInEventLoopCallback> recyclerHandle;
        private static final Recycler<WriteInEventLoopCallback> RECYCLER = new Recycler<WriteInEventLoopCallback>(){

            @Override
            protected WriteInEventLoopCallback newObject(Recycler.Handle<WriteInEventLoopCallback> handle) {
                return new WriteInEventLoopCallback(handle);
            }
        };

        static WriteInEventLoopCallback create(ProducerImpl<?> producer, ClientCnx cnx, OpSendMsg op) {
            WriteInEventLoopCallback c = RECYCLER.get();
            c.producer = producer;
            c.cnx = cnx;
            c.sequenceId = op.sequenceId;
            c.cmd = op.cmd;
            c.op = op;
            return c;
        }

        @Override
        public void run() {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Sending message cnx {}, sequenceId {}", new Object[]{this.producer.topic, ((ProducerImpl)this.producer).producerName, this.cnx, this.sequenceId});
            }
            try {
                this.cnx.ctx().writeAndFlush(this.cmd, this.cnx.ctx().voidPromise());
                this.op.updateSentTimestamp();
            }
            finally {
                this.recycle();
            }
        }

        private void recycle() {
            this.producer = null;
            this.cnx = null;
            this.cmd = null;
            this.sequenceId = -1L;
            this.op = null;
            this.recyclerHandle.recycle(this);
        }

        private WriteInEventLoopCallback(Recycler.Handle<WriteInEventLoopCallback> recyclerHandle) {
            this.recyclerHandle = recyclerHandle;
        }
    }
}

