/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.pulsar.common.protocol;

import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.KeySharedPolicy;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Strings;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.CompositeByteBuf;
import org.apache.pulsar.shade.io.netty.buffer.Unpooled;
import org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocal;
import org.apache.pulsar.shade.org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.shade.org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.shade.org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.AuthData;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.AuthMethod;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.BaseCommand;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAckResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAddPartitionToTxn;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAddPartitionToTxnResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxn;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAddSubscriptionToTxnResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAuthChallenge;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandConnect;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandConnected;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandEndTxnOnPartitionResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandEndTxnOnSubscriptionResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandEndTxnResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetLastMessageIdResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetSchema;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetSchemaResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespaceResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandLookupTopic;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandNewTxnResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandPartitionedTopicMetadataResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandProducer;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandProducerSuccess;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandRedeliverUnacknowledgedMessages;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSeek;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSend;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandTcClientConnectResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.FeatureFlags;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.IntRange;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.Schema;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ServerError;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.Subscription;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.shade.org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.shade.org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class Commands {
    private static final Logger log = LoggerFactory.getLogger(Commands.class);
    public static final int DEFAULT_MAX_MESSAGE_SIZE = 0x500000;
    public static final int MESSAGE_SIZE_FRAME_PADDING = 10240;
    public static final int INVALID_MAX_MESSAGE_SIZE = -1;
    public static final long DEFAULT_CONSUMER_EPOCH = -1L;
    public static final short magicCrc32c = 3585;
    public static final short magicBrokerEntryMetadata = 3586;
    private static final int checksumSize = 4;
    private static final FastThreadLocal<BaseCommand> LOCAL_BASE_COMMAND = new FastThreadLocal<BaseCommand>(){

        @Override
        protected BaseCommand initialValue() throws Exception {
            return new BaseCommand();
        }
    };
    private static final int CURRENT_PROTOCOL_VERSION = ProtocolVersion.values()[ProtocolVersion.values().length - 1].getValue();
    private static final FastThreadLocal<SingleMessageMetadata> LOCAL_SINGLE_MESSAGE_METADATA = new FastThreadLocal<SingleMessageMetadata>(){

        @Override
        protected SingleMessageMetadata initialValue() throws Exception {
            return new SingleMessageMetadata();
        }
    };
    private static final FastThreadLocal<MessageMetadata> LOCAL_MESSAGE_METADATA = new FastThreadLocal<MessageMetadata>(){

        @Override
        protected MessageMetadata initialValue() throws Exception {
            return new MessageMetadata();
        }
    };
    private static final FastThreadLocal<BrokerEntryMetadata> BROKER_ENTRY_METADATA = new FastThreadLocal<BrokerEntryMetadata>(){

        @Override
        protected BrokerEntryMetadata initialValue() throws Exception {
            return new BrokerEntryMetadata();
        }
    };
    private static final ByteBuf cmdPing;
    private static final ByteBuf cmdPong;
    private static final byte[] NONE_KEY;

    private static BaseCommand localCmd(BaseCommand.Type type) {
        return LOCAL_BASE_COMMAND.get().clear().setType(type);
    }

    public static ByteBuf newConnect(String authMethodName, String authData, String libVersion) {
        return Commands.newConnect(authMethodName, authData, Commands.getCurrentProtocolVersion(), libVersion, null, null, null, null);
    }

    public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker) {
        return Commands.newConnect(authMethodName, authData, Commands.getCurrentProtocolVersion(), libVersion, targetBroker, null, null, null);
    }

    public static ByteBuf newConnect(String authMethodName, String authData, String libVersion, String targetBroker, String originalPrincipal, String clientAuthData, String clientAuthMethod) {
        return Commands.newConnect(authMethodName, authData, Commands.getCurrentProtocolVersion(), libVersion, targetBroker, originalPrincipal, clientAuthData, clientAuthMethod);
    }

    private static void setFeatureFlags(FeatureFlags flags) {
        flags.setSupportsAuthRefresh(true);
        flags.setSupportsBrokerEntryMetadata(true);
        flags.setSupportsPartialProducer(true);
    }

    public static ByteBuf newConnect(String authMethodName, String authData, int protocolVersion, String libVersion, String targetBroker, String originalPrincipal, String originalAuthData, String originalAuthMethod) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.CONNECT);
        CommandConnect connect = cmd.setConnect().setClientVersion(libVersion != null ? libVersion : "Pulsar Client").setAuthMethodName(authMethodName);
        if ("ycav1".equals(authMethodName)) {
            connect.setAuthMethod(AuthMethod.AuthMethodYcaV1);
        }
        if (targetBroker != null) {
            connect.setProxyToBrokerUrl(targetBroker);
        }
        if (authData != null) {
            connect.setAuthData(authData.getBytes(StandardCharsets.UTF_8));
        }
        if (originalPrincipal != null) {
            connect.setOriginalPrincipal(originalPrincipal);
        }
        if (originalAuthData != null) {
            connect.setOriginalAuthData(originalAuthData);
        }
        if (originalAuthMethod != null) {
            connect.setOriginalAuthMethod(originalAuthMethod);
        }
        connect.setProtocolVersion(protocolVersion);
        Commands.setFeatureFlags(connect.setFeatureFlags());
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newConnect(String authMethodName, AuthData authData, int protocolVersion, String libVersion, String targetBroker, String originalPrincipal, AuthData originalAuthData, String originalAuthMethod) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.CONNECT);
        CommandConnect connect = cmd.setConnect().setClientVersion(libVersion != null ? libVersion : "Pulsar Client").setAuthMethodName(authMethodName);
        if (targetBroker != null) {
            connect.setProxyToBrokerUrl(targetBroker);
        }
        if (authData != null) {
            connect.setAuthData(authData.getBytes());
        }
        if (originalPrincipal != null) {
            connect.setOriginalPrincipal(originalPrincipal);
        }
        if (originalAuthData != null) {
            connect.setOriginalAuthData(new String(originalAuthData.getBytes(), StandardCharsets.UTF_8));
        }
        if (originalAuthMethod != null) {
            connect.setOriginalAuthMethod(originalAuthMethod);
        }
        connect.setProtocolVersion(protocolVersion);
        Commands.setFeatureFlags(connect.setFeatureFlags());
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newConnected(int clientProtocoVersion) {
        return Commands.newConnected(clientProtocoVersion, -1);
    }

    public static BaseCommand newConnectedCommand(int clientProtocolVersion, int maxMessageSize) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.CONNECTED);
        CommandConnected connected = cmd.setConnected().setServerVersion("Pulsar Server" + PulsarVersion.getVersion());
        if (-1 != maxMessageSize) {
            connected.setMaxMessageSize(maxMessageSize);
        }
        int currentProtocolVersion = Commands.getCurrentProtocolVersion();
        int versionToAdvertise = Math.min(currentProtocolVersion, clientProtocolVersion);
        connected.setProtocolVersion(versionToAdvertise);
        return cmd;
    }

    public static ByteBuf newConnected(int clientProtocolVersion, int maxMessageSize) {
        return Commands.serializeWithSize(Commands.newConnectedCommand(clientProtocolVersion, maxMessageSize));
    }

    public static ByteBuf newAuthChallenge(String authMethod, AuthData brokerData, int clientProtocolVersion) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.AUTH_CHALLENGE);
        CommandAuthChallenge challenge = cmd.setAuthChallenge();
        int currentProtocolVersion = Commands.getCurrentProtocolVersion();
        int versionToAdvertise = Math.min(currentProtocolVersion, clientProtocolVersion);
        challenge.setProtocolVersion(versionToAdvertise).setChallenge().setAuthData(brokerData != null ? brokerData.getBytes() : new byte[]{}).setAuthMethodName(authMethod);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAuthResponse(String authMethod, AuthData clientData, int clientProtocolVersion, String clientVersion) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.AUTH_RESPONSE);
        cmd.setAuthResponse().setClientVersion(clientVersion != null ? clientVersion : "Pulsar Client").setProtocolVersion(clientProtocolVersion).setResponse().setAuthData(clientData.getBytes()).setAuthMethodName(authMethod);
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newSuccessCommand(long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.SUCCESS);
        cmd.setSuccess().setRequestId(requestId);
        return cmd;
    }

    public static ByteBuf newSuccess(long requestId) {
        return Commands.serializeWithSize(Commands.newSuccessCommand(requestId));
    }

    public static BaseCommand newProducerSuccessCommand(long requestId, String producerName, SchemaVersion schemaVersion) {
        return Commands.newProducerSuccessCommand(requestId, producerName, -1L, schemaVersion, Optional.empty(), true);
    }

    public static ByteBuf newProducerSuccess(long requestId, String producerName, SchemaVersion schemaVersion) {
        return Commands.newProducerSuccess(requestId, producerName, -1L, schemaVersion, Optional.empty(), true);
    }

    public static BaseCommand newProducerSuccessCommand(long requestId, String producerName, long lastSequenceId, SchemaVersion schemaVersion, Optional<Long> topicEpoch, boolean isProducerReady) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.PRODUCER_SUCCESS);
        CommandProducerSuccess ps = cmd.setProducerSuccess().setRequestId(requestId).setProducerName(producerName).setLastSequenceId(lastSequenceId).setSchemaVersion(schemaVersion.bytes()).setProducerReady(isProducerReady);
        topicEpoch.ifPresent(ps::setTopicEpoch);
        return cmd;
    }

    public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId, SchemaVersion schemaVersion, Optional<Long> topicEpoch, boolean isProducerReady) {
        return Commands.serializeWithSize(Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId, schemaVersion, topicEpoch, isProducerReady));
    }

    public static BaseCommand newErrorCommand(long requestId, ServerError serverError, String message) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ERROR);
        cmd.setError().setRequestId(requestId).setError(serverError).setMessage(message != null ? message : "");
        return cmd;
    }

    public static ByteBuf newError(long requestId, ServerError serverError, String message) {
        return Commands.serializeWithSize(Commands.newErrorCommand(requestId, serverError, message));
    }

    public static BaseCommand newSendReceiptCommand(long producerId, long sequenceId, long highestId, long ledgerId, long entryId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.SEND_RECEIPT);
        cmd.setSendReceipt().setProducerId(producerId).setSequenceId(sequenceId).setHighestSequenceId(highestId).setMessageId().setLedgerId(ledgerId).setEntryId(entryId);
        return cmd;
    }

    public static ByteBuf newSendReceipt(long producerId, long sequenceId, long highestId, long ledgerId, long entryId) {
        return Commands.serializeWithSize(Commands.newSendReceiptCommand(producerId, sequenceId, highestId, ledgerId, entryId));
    }

    public static BaseCommand newSendErrorCommand(long producerId, long sequenceId, ServerError error, String errorMsg) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.SEND_ERROR);
        cmd.setSendError().setProducerId(producerId).setSequenceId(sequenceId).setError(error).setMessage(errorMsg != null ? errorMsg : "");
        return cmd;
    }

    public static ByteBuf newSendError(long producerId, long sequenceId, ServerError error, String errorMsg) {
        return Commands.serializeWithSize(Commands.newSendErrorCommand(producerId, sequenceId, error, errorMsg));
    }

    public static boolean hasChecksum(ByteBuf buffer) {
        return buffer.getShort(buffer.readerIndex()) == 3585;
    }

    public static int readChecksum(ByteBuf buffer) {
        buffer.skipBytes(2);
        return buffer.readInt();
    }

    public static void skipChecksumIfPresent(ByteBuf buffer) {
        if (Commands.hasChecksum(buffer)) {
            Commands.readChecksum(buffer);
        }
    }

    public static MessageMetadata parseMessageMetadata(ByteBuf buffer) {
        MessageMetadata md = LOCAL_MESSAGE_METADATA.get();
        Commands.parseMessageMetadata(buffer, md);
        return md;
    }

    public static void parseMessageMetadata(ByteBuf buffer, MessageMetadata msgMetadata) {
        Commands.skipBrokerEntryMetadataIfExist(buffer);
        Commands.skipChecksumIfPresent(buffer);
        int metadataSize = (int)buffer.readUnsignedInt();
        msgMetadata.parseFrom(buffer, metadataSize);
    }

    public static void skipMessageMetadata(ByteBuf buffer) {
        Commands.skipBrokerEntryMetadataIfExist(buffer);
        Commands.skipChecksumIfPresent(buffer);
        int metadataSize = (int)buffer.readUnsignedInt();
        buffer.skipBytes(metadataSize);
    }

    public static long getEntryTimestamp(ByteBuf headersAndPayloadWithBrokerEntryMetadata) throws IOException {
        BrokerEntryMetadata brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayloadWithBrokerEntryMetadata);
        if (brokerEntryMetadata != null && brokerEntryMetadata.hasBrokerTimestamp()) {
            return brokerEntryMetadata.getBrokerTimestamp();
        }
        return Commands.parseMessageMetadata(headersAndPayloadWithBrokerEntryMetadata).getPublishTime();
    }

    public static BaseCommand newMessageCommand(long consumerId, long ledgerId, long entryId, int partition, int redeliveryCount, long[] ackSet, long consumerEpoch) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.MESSAGE);
        CommandMessage msg = cmd.setMessage().setConsumerId(consumerId);
        msg.setMessageId().setLedgerId(ledgerId).setEntryId(entryId).setPartition(partition);
        if (consumerEpoch > -1L) {
            msg.setConsumerEpoch(consumerEpoch);
        }
        if (redeliveryCount > 0) {
            msg.setRedeliveryCount(redeliveryCount);
        }
        if (ackSet != null) {
            for (int i = 0; i < ackSet.length; ++i) {
                msg.addAckSet(ackSet[i]);
            }
        }
        return cmd;
    }

    public static ByteBufPair newMessage(long consumerId, long ledgerId, long entryId, int partition, int redeliveryCount, ByteBuf metadataAndPayload, long[] ackSet) {
        return Commands.serializeCommandMessageWithSize(Commands.newMessageCommand(consumerId, ledgerId, entryId, partition, redeliveryCount, ackSet, -1L), metadataAndPayload);
    }

    public static ByteBufPair newSend(long producerId, long sequenceId, int numMessaegs, ChecksumType checksumType, MessageMetadata messageMetadata, ByteBuf payload) {
        return Commands.newSend(producerId, sequenceId, -1L, numMessaegs, messageMetadata.hasTxnidLeastBits() ? messageMetadata.getTxnidLeastBits() : -1L, messageMetadata.hasTxnidMostBits() ? messageMetadata.getTxnidMostBits() : -1L, checksumType, messageMetadata, payload);
    }

    public static ByteBufPair newSend(long producerId, long lowestSequenceId, long highestSequenceId, int numMessaegs, ChecksumType checksumType, MessageMetadata messageMetadata, ByteBuf payload) {
        return Commands.newSend(producerId, lowestSequenceId, highestSequenceId, numMessaegs, messageMetadata.hasTxnidLeastBits() ? messageMetadata.getTxnidLeastBits() : -1L, messageMetadata.hasTxnidMostBits() ? messageMetadata.getTxnidMostBits() : -1L, checksumType, messageMetadata, payload);
    }

    public static ByteBufPair newSend(long producerId, long sequenceId, long highestSequenceId, int numMessages, long txnIdLeastBits, long txnIdMostBits, ChecksumType checksumType, MessageMetadata messageData, ByteBuf payload) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.SEND);
        CommandSend send = cmd.setSend().setProducerId(producerId).setSequenceId(sequenceId);
        if (highestSequenceId >= 0L) {
            send.setHighestSequenceId(highestSequenceId);
        }
        if (numMessages > 1) {
            send.setNumMessages(numMessages);
        }
        if (txnIdLeastBits >= 0L) {
            send.setTxnidLeastBits(txnIdLeastBits);
        }
        if (txnIdMostBits >= 0L) {
            send.setTxnidMostBits(txnIdMostBits);
        }
        if (messageData.hasTotalChunkMsgSize() && messageData.getTotalChunkMsgSize() > 1) {
            send.setIsChunk(true);
        }
        if (messageData.hasMarkerType()) {
            send.setMarker(true);
        }
        return Commands.serializeCommandSendWithSize(cmd, checksumType, messageData, payload);
    }

    public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, CommandSubscribe.SubType subType, int priorityLevel, String consumerName, long resetStartMessageBackInSeconds) {
        return Commands.newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, true, null, Collections.emptyMap(), false, false, CommandSubscribe.InitialPosition.Earliest, resetStartMessageBackInSeconds, null, true);
    }

    public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, Map<String, String> metadata, boolean readCompacted, boolean isReplicated, CommandSubscribe.InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist) {
        return Commands.newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, isDurable, startMessageId, metadata, readCompacted, isReplicated, subscriptionInitialPosition, startMessageRollbackDurationInSec, schemaInfo, createTopicIfDoesNotExist, null, Collections.emptyMap(), -1L);
    }

    public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, CommandSubscribe.SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, Map<String, String> metadata, boolean readCompacted, boolean isReplicated, CommandSubscribe.InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, Map<String, String> subscriptionProperties, long consumerEpoch) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.SUBSCRIBE);
        CommandSubscribe subscribe = cmd.setSubscribe().setTopic(topic).setSubscription(subscription).setSubType(subType).setConsumerId(consumerId).setConsumerName(consumerName).setRequestId(requestId).setPriorityLevel(priorityLevel).setDurable(isDurable).setReadCompacted(readCompacted).setInitialPosition(subscriptionInitialPosition).setReplicateSubscriptionState(isReplicated).setForceTopicCreation(createTopicIfDoesNotExist).setConsumerEpoch(consumerEpoch);
        if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) {
            ArrayList<KeyValue> keyValues = new ArrayList<KeyValue>();
            subscriptionProperties.forEach((key, value) -> {
                KeyValue keyValue = new KeyValue();
                keyValue.setKey((String)key);
                keyValue.setValue((String)value);
                keyValues.add(keyValue);
            });
            subscribe.addAllSubscriptionProperties(keyValues);
        }
        if (keySharedPolicy != null) {
            KeySharedMeta keySharedMeta = subscribe.setKeySharedMeta();
            keySharedMeta.setAllowOutOfOrderDelivery(keySharedPolicy.isAllowOutOfOrderDelivery());
            keySharedMeta.setKeySharedMode(Commands.convertKeySharedMode(keySharedPolicy.getKeySharedMode()));
            if (keySharedPolicy instanceof KeySharedPolicy.KeySharedPolicySticky) {
                List<Range> ranges = ((KeySharedPolicy.KeySharedPolicySticky)keySharedPolicy).getRanges();
                for (Range range : ranges) {
                    IntRange r = keySharedMeta.addHashRange();
                    r.setStart(range.getStart());
                    r.setEnd(range.getEnd());
                }
            }
        }
        if (startMessageId != null) {
            subscribe.setStartMessageId().copyFrom(startMessageId);
        }
        if (startMessageRollbackDurationInSec > 0L) {
            subscribe.setStartMessageRollbackDurationSec(startMessageRollbackDurationInSec);
        }
        if (!metadata.isEmpty()) {
            metadata.entrySet().forEach(e -> subscribe.addMetadata().setKey((String)e.getKey()).setValue((String)e.getValue()));
        }
        if (schemaInfo != null) {
            if (subscribe.hasSchema()) {
                throw new IllegalStateException();
            }
            if (subscribe.setSchema().getPropertiesCount() > 0) {
                throw new IllegalStateException();
            }
            Commands.convertSchema(schemaInfo, subscribe.setSchema());
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newTcClientConnectRequest(long tcId, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.TC_CLIENT_CONNECT_REQUEST);
        cmd.setTcClientConnectRequest().setTcId(tcId).setRequestId(requestId);
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newTcClientConnectResponse(long requestId, ServerError error, String message) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.TC_CLIENT_CONNECT_RESPONSE);
        CommandTcClientConnectResponse response = cmd.setTcClientConnectResponse().setRequestId(requestId);
        if (error != null) {
            response.setError(error);
        }
        if (message != null) {
            response.setMessage(message);
        }
        return cmd;
    }

    private static KeySharedMode convertKeySharedMode(org.apache.pulsar.client.api.KeySharedMode mode) {
        switch (mode) {
            case AUTO_SPLIT: {
                return KeySharedMode.AUTO_SPLIT;
            }
            case STICKY: {
                return KeySharedMode.STICKY;
            }
        }
        throw new IllegalArgumentException("Unexpected key shared mode: " + (Object)((Object)mode));
    }

    public static ByteBuf newUnsubscribe(long consumerId, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.UNSUBSCRIBE);
        cmd.setUnsubscribe().setConsumerId(consumerId).setRequestId(requestId);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newActiveConsumerChange(long consumerId, boolean isActive) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ACTIVE_CONSUMER_CHANGE);
        cmd.setActiveConsumerChange().setConsumerId(consumerId).setIsActive(isActive);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newSeek(long consumerId, long requestId, long ledgerId, long entryId, long[] ackSet) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.SEEK);
        CommandSeek seek = cmd.setSeek().setConsumerId(consumerId).setRequestId(requestId);
        MessageIdData messageId = seek.setMessageId().setLedgerId(ledgerId).setEntryId(entryId);
        for (int i = 0; i < ackSet.length; ++i) {
            messageId.addAckSet(ackSet[i]);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newSeek(long consumerId, long requestId, long timestamp) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.SEEK);
        cmd.setSeek().setConsumerId(consumerId).setRequestId(requestId).setMessagePublishTime(timestamp);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newCloseConsumer(long consumerId, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.CLOSE_CONSUMER);
        cmd.setCloseConsumer().setConsumerId(consumerId).setRequestId(requestId);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newReachedEndOfTopic(long consumerId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.REACHED_END_OF_TOPIC);
        cmd.setReachedEndOfTopic().setConsumerId(consumerId);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newCloseProducer(long producerId, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.CLOSE_PRODUCER);
        cmd.setCloseProducer().setProducerId(producerId).setRequestId(requestId);
        return Commands.serializeWithSize(cmd);
    }

    @VisibleForTesting
    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, Map<String, String> metadata, boolean isTxnEnabled) {
        return Commands.newProducer(topic, producerId, requestId, producerName, false, metadata, isTxnEnabled);
    }

    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, boolean encrypted, Map<String, String> metadata, boolean isTxnEnabled) {
        return Commands.newProducer(topic, producerId, requestId, producerName, encrypted, metadata, null, 0L, false, ProducerAccessMode.Shared, Optional.empty(), isTxnEnabled);
    }

    private static Schema.Type getSchemaType(SchemaType type) {
        if (type.getValue() < 0) {
            return Schema.Type.None;
        }
        return Schema.Type.valueOf(type.getValue());
    }

    public static SchemaType getSchemaType(Schema.Type type) {
        if (type.getValue() < 0) {
            return SchemaType.NONE;
        }
        return SchemaType.valueOf(type.getValue());
    }

    private static void convertSchema(SchemaInfo schemaInfo, Schema schema) {
        schema.setName(schemaInfo.getName()).setSchemaData(schemaInfo.getSchema()).setType(Commands.getSchemaType(schemaInfo.getType()));
        schemaInfo.getProperties().entrySet().stream().forEach(entry -> {
            if (entry.getKey() != null && entry.getValue() != null) {
                schema.addProperty().setKey((String)entry.getKey()).setValue((String)entry.getValue());
            }
        });
    }

    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, boolean encrypted, Map<String, String> metadata, SchemaInfo schemaInfo, long epoch, boolean userProvidedProducerName, ProducerAccessMode accessMode, Optional<Long> topicEpoch, boolean isTxnEnabled) {
        return Commands.newProducer(topic, producerId, requestId, producerName, encrypted, metadata, schemaInfo, epoch, userProvidedProducerName, accessMode, topicEpoch, isTxnEnabled, null);
    }

    public static ByteBuf newProducer(String topic, long producerId, long requestId, String producerName, boolean encrypted, Map<String, String> metadata, SchemaInfo schemaInfo, long epoch, boolean userProvidedProducerName, ProducerAccessMode accessMode, Optional<Long> topicEpoch, boolean isTxnEnabled, String initialSubscriptionName) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.PRODUCER);
        CommandProducer producer = cmd.setProducer().setTopic(topic).setProducerId(producerId).setRequestId(requestId).setEpoch(epoch).setUserProvidedProducerName(userProvidedProducerName).setEncrypted(encrypted).setTxnEnabled(isTxnEnabled).setProducerAccessMode(Commands.convertProducerAccessMode(accessMode));
        if (producerName != null) {
            producer.setProducerName(producerName);
        }
        if (!metadata.isEmpty()) {
            metadata.forEach((k, v) -> producer.addMetadata().setKey((String)k).setValue((String)v));
        }
        if (null != schemaInfo) {
            Commands.convertSchema(schemaInfo, producer.setSchema());
        }
        topicEpoch.ifPresent(producer::setTopicEpoch);
        if (!Strings.isNullOrEmpty(initialSubscriptionName)) {
            producer.setInitialSubscriptionName(initialSubscriptionName);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newPartitionMetadataResponseCommand(ServerError error, String errorMsg, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.PARTITIONED_METADATA_RESPONSE);
        CommandPartitionedTopicMetadataResponse response = cmd.setPartitionMetadataResponse().setRequestId(requestId).setError(error).setResponse(CommandPartitionedTopicMetadataResponse.LookupType.Failed);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return cmd;
    }

    public static ByteBuf newPartitionMetadataResponse(ServerError error, String errorMsg, long requestId) {
        return Commands.serializeWithSize(Commands.newPartitionMetadataResponseCommand(error, errorMsg, requestId));
    }

    public static ByteBuf newPartitionMetadataRequest(String topic, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.PARTITIONED_METADATA);
        cmd.setPartitionMetadata().setTopic(topic).setRequestId(requestId);
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newPartitionMetadataResponseCommand(int partitions, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.PARTITIONED_METADATA_RESPONSE);
        cmd.setPartitionMetadataResponse().setPartitions(partitions).setResponse(CommandPartitionedTopicMetadataResponse.LookupType.Success).setRequestId(requestId);
        return cmd;
    }

    public static ByteBuf newPartitionMetadataResponse(int partitions, long requestId) {
        return Commands.serializeWithSize(Commands.newPartitionMetadataResponseCommand(partitions, requestId));
    }

    public static ByteBuf newLookup(String topic, boolean authoritative, long requestId) {
        return Commands.newLookup(topic, null, authoritative, requestId);
    }

    public static ByteBuf newLookup(String topic, String listenerName, boolean authoritative, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.LOOKUP);
        CommandLookupTopic lookup = cmd.setLookupTopic().setTopic(topic).setRequestId(requestId).setAuthoritative(authoritative);
        if (StringUtils.isNotBlank(listenerName)) {
            lookup.setAdvertisedListenerName(listenerName);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newLookupResponseCommand(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative, CommandLookupTopicResponse.LookupType lookupType, long requestId, boolean proxyThroughServiceUrl) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.LOOKUP_RESPONSE);
        CommandLookupTopicResponse response = cmd.setLookupTopicResponse().setResponse(lookupType).setRequestId(requestId).setAuthoritative(authoritative).setProxyThroughServiceUrl(proxyThroughServiceUrl);
        if (brokerServiceUrl != null) {
            response.setBrokerServiceUrl(brokerServiceUrl);
        }
        if (brokerServiceUrlTls != null) {
            response.setBrokerServiceUrlTls(brokerServiceUrlTls);
        }
        return cmd;
    }

    public static ByteBuf newLookupResponse(String brokerServiceUrl, String brokerServiceUrlTls, boolean authoritative, CommandLookupTopicResponse.LookupType lookupType, long requestId, boolean proxyThroughServiceUrl) {
        return Commands.serializeWithSize(Commands.newLookupResponseCommand(brokerServiceUrl, brokerServiceUrlTls, authoritative, lookupType, requestId, proxyThroughServiceUrl));
    }

    public static BaseCommand newLookupErrorResponseCommand(ServerError error, String errorMsg, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.LOOKUP_RESPONSE);
        CommandLookupTopicResponse response = cmd.setLookupTopicResponse().setRequestId(requestId).setError(error).setResponse(CommandLookupTopicResponse.LookupType.Failed);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return cmd;
    }

    public static ByteBuf newLookupErrorResponse(ServerError error, String errorMsg, long requestId) {
        return Commands.serializeWithSize(Commands.newLookupErrorResponseCommand(error, errorMsg, requestId));
    }

    public static ByteBuf newMultiTransactionMessageAck(long consumerId, TxnID txnID, List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
        BaseCommand cmd = Commands.newMultiMessageAckCommon(entries);
        cmd.getAck().setConsumerId(consumerId).setAckType(CommandAck.AckType.Individual).setTxnidLeastBits(txnID.getLeastSigBits()).setTxnidMostBits(txnID.getMostSigBits());
        return Commands.serializeWithSize(cmd);
    }

    private static BaseCommand newMultiMessageAckCommon(List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ACK);
        CommandAck ack = cmd.setAck();
        int entriesCount = entries.size();
        for (int i = 0; i < entriesCount; ++i) {
            long ledgerId = entries.get(i).getLeft();
            long entryId = entries.get(i).getMiddle();
            ConcurrentBitSetRecyclable bitSet = entries.get(i).getRight();
            MessageIdData msgId = ack.addMessageId().setLedgerId(ledgerId).setEntryId(entryId);
            if (bitSet == null) continue;
            long[] ackSet = bitSet.toLongArray();
            for (int j = 0; j < ackSet.length; ++j) {
                msgId.addAckSet(ackSet[j]);
            }
            bitSet.recycle();
        }
        return cmd;
    }

    public static ByteBuf newMultiMessageAck(long consumerId, List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entries, long requestId) {
        BaseCommand cmd = Commands.newMultiMessageAckCommon(entries);
        cmd.getAck().setConsumerId(consumerId).setAckType(CommandAck.AckType.Individual);
        if (requestId >= 0L) {
            cmd.getAck().setRequestId(requestId);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> properties, long requestId) {
        return Commands.newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, -1L, -1L, requestId, -1);
    }

    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> properties, long txnIdLeastBits, long txnIdMostBits, long requestId, int batchSize) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ACK);
        CommandAck ack = cmd.setAck().setConsumerId(consumerId).setAckType(ackType);
        MessageIdData messageIdData = ack.addMessageId().setLedgerId(ledgerId).setEntryId(entryId);
        if (ackSet != null) {
            long[] as = ackSet.toLongArray();
            for (int i = 0; i < as.length; ++i) {
                messageIdData.addAckSet(as[i]);
            }
        }
        if (batchSize >= 0) {
            messageIdData.setBatchSize(batchSize);
        }
        if (validationError != null) {
            ack.setValidationError(validationError);
        }
        if (txnIdMostBits >= 0L) {
            ack.setTxnidMostBits(txnIdMostBits);
        }
        if (txnIdLeastBits >= 0L) {
            ack.setTxnidLeastBits(txnIdLeastBits);
        }
        if (requestId >= 0L) {
            ack.setRequestId(requestId);
        }
        if (!properties.isEmpty()) {
            properties.forEach((k, v) -> ack.addProperty().setKey((String)k).setValue((long)v));
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAck(long consumerId, long ledgerId, long entryId, BitSetRecyclable ackSet, CommandAck.AckType ackType, CommandAck.ValidationError validationError, Map<String, Long> properties, long txnIdLeastBits, long txnIdMostBits, long requestId) {
        return Commands.newAck(consumerId, ledgerId, entryId, ackSet, ackType, validationError, properties, txnIdLeastBits, txnIdMostBits, requestId, -1);
    }

    public static ByteBuf newAckResponse(long requestId, ServerError error, String errorMsg, long consumerId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ACK_RESPONSE);
        CommandAckResponse response = cmd.setAckResponse().setConsumerId(consumerId).setRequestId(requestId);
        if (error != null) {
            response.setError(error);
        }
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newFlow(long consumerId, int messagePermits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.FLOW);
        cmd.setFlow().setConsumerId(consumerId).setMessagePermits(messagePermits);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId, long consumerEpoch) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.REDELIVER_UNACKNOWLEDGED_MESSAGES);
        cmd.setRedeliverUnacknowledgedMessages().setConsumerId(consumerId).setConsumerEpoch(consumerEpoch);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newRedeliverUnacknowledgedMessages(long consumerId, List<MessageIdData> messageIds) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.REDELIVER_UNACKNOWLEDGED_MESSAGES);
        CommandRedeliverUnacknowledgedMessages req = cmd.setRedeliverUnacknowledgedMessages().setConsumerId(consumerId);
        messageIds.forEach(msgId -> {
            MessageIdData m = req.addMessageId().setLedgerId(msgId.getLedgerId()).setEntryId(msgId.getEntryId());
            if (msgId.hasBatchIndex()) {
                m.setBatchIndex(msgId.getBatchIndex());
            }
        });
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newConsumerStatsResponse(ServerError serverError, String errMsg, long requestId) {
        return Commands.serializeWithSize(Commands.newConsumerStatsResponseCommand(serverError, errMsg, requestId));
    }

    public static BaseCommand newConsumerStatsResponseCommand(ServerError serverError, String errMsg, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.CONSUMER_STATS_RESPONSE);
        cmd.setConsumerStatsResponse().setRequestId(requestId).setErrorCode(serverError);
        if (errMsg != null) {
            cmd.getConsumerStatsResponse().setErrorMessage(errMsg);
        }
        return cmd;
    }

    public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId, CommandGetTopicsOfNamespace.Mode mode) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_TOPICS_OF_NAMESPACE);
        CommandGetTopicsOfNamespace topics = cmd.setGetTopicsOfNamespace();
        topics.setNamespace(namespace);
        topics.setRequestId(requestId);
        topics.setMode(mode);
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newGetTopicsOfNamespaceResponseCommand(List<String> topics, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_TOPICS_OF_NAMESPACE_RESPONSE);
        CommandGetTopicsOfNamespaceResponse topicsResponse = cmd.setGetTopicsOfNamespaceResponse();
        topicsResponse.setRequestId(requestId);
        for (int i = 0; i < topics.size(); ++i) {
            topicsResponse.addTopic(topics.get(i));
        }
        return cmd;
    }

    public static ByteBuf newGetTopicsOfNamespaceResponse(List<String> topics, long requestId) {
        return Commands.serializeWithSize(Commands.newGetTopicsOfNamespaceResponseCommand(topics, requestId));
    }

    static ByteBuf newPing() {
        return cmdPing.retainedDuplicate();
    }

    public static ByteBuf newPong() {
        return cmdPong.retainedDuplicate();
    }

    public static ByteBuf newGetLastMessageId(long consumerId, long requestId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_LAST_MESSAGE_ID);
        cmd.setGetLastMessageId().setRequestId(requestId).setConsumerId(consumerId);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newGetLastMessageIdResponse(long requestId, long lastMessageLedgerId, long lastMessageEntryId, int lastMessagePartitionIdx, int lastMessageBatchIndex, long markDeletePositionLedgerId, long markDeletePositionEntryId) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_LAST_MESSAGE_ID_RESPONSE);
        CommandGetLastMessageIdResponse response = cmd.setGetLastMessageIdResponse().setRequestId(requestId);
        response.setLastMessageId().setLedgerId(lastMessageLedgerId).setEntryId(lastMessageEntryId).setPartition(lastMessagePartitionIdx).setBatchIndex(lastMessageBatchIndex);
        if (markDeletePositionLedgerId >= 0L) {
            response.setConsumerMarkDeletePosition().setLedgerId(markDeletePositionLedgerId).setEntryId(markDeletePositionEntryId);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newGetSchema(long requestId, String topic, Optional<SchemaVersion> version) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_SCHEMA);
        CommandGetSchema schema = cmd.setGetSchema().setRequestId(requestId).setTopic(topic);
        version.ifPresent(schemaVersion -> schema.setSchemaVersion(schemaVersion.bytes()));
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newGetSchemaResponse(long requestId, CommandGetSchemaResponse response) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_SCHEMA_RESPONSE);
        cmd.setGetSchemaResponse().copyFrom(response).setRequestId(requestId);
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newGetSchemaResponseCommand(long requestId, SchemaInfo schemaInfo, SchemaVersion version) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_SCHEMA_RESPONSE);
        Schema schema = cmd.setGetSchemaResponse().setRequestId(requestId).setSchemaVersion(version.bytes()).setSchema();
        Commands.convertSchema(schemaInfo, schema);
        return cmd;
    }

    public static ByteBuf newGetSchemaResponse(long requestId, SchemaInfo schemaInfo, SchemaVersion version) {
        return Commands.serializeWithSize(Commands.newGetSchemaResponseCommand(requestId, schemaInfo, version));
    }

    public static BaseCommand newGetSchemaResponseErrorCommand(long requestId, ServerError error, String errorMessage) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_SCHEMA_RESPONSE);
        cmd.setGetSchemaResponse().setRequestId(requestId).setErrorCode(error).setErrorMessage(errorMessage);
        return cmd;
    }

    public static ByteBuf newGetSchemaResponseError(long requestId, ServerError error, String errorMessage) {
        return Commands.serializeWithSize(Commands.newGetSchemaResponseErrorCommand(requestId, error, errorMessage));
    }

    public static ByteBuf newGetOrCreateSchema(long requestId, String topic, SchemaInfo schemaInfo) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_OR_CREATE_SCHEMA);
        Schema schema = cmd.setGetOrCreateSchema().setRequestId(requestId).setTopic(topic).setSchema();
        Commands.convertSchema(schemaInfo, schema);
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newGetOrCreateSchemaResponseCommand(long requestId, SchemaVersion schemaVersion) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_OR_CREATE_SCHEMA_RESPONSE);
        cmd.setGetOrCreateSchemaResponse().setRequestId(requestId).setSchemaVersion(schemaVersion.bytes());
        return cmd;
    }

    public static ByteBuf newGetOrCreateSchemaResponse(long requestId, SchemaVersion schemaVersion) {
        return Commands.serializeWithSize(Commands.newGetOrCreateSchemaResponseCommand(requestId, schemaVersion));
    }

    public static BaseCommand newGetOrCreateSchemaResponseErrorCommand(long requestId, ServerError error, String errorMessage) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_OR_CREATE_SCHEMA_RESPONSE);
        cmd.setGetOrCreateSchemaResponse().setRequestId(requestId).setErrorCode(error).setErrorMessage(errorMessage);
        return cmd;
    }

    public static ByteBuf newGetOrCreateSchemaResponseError(long requestId, ServerError error, String errorMessage) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.GET_OR_CREATE_SCHEMA_RESPONSE);
        cmd.setGetOrCreateSchemaResponse().setRequestId(requestId).setErrorCode(error).setErrorMessage(errorMessage);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newTxn(long tcId, long requestId, long ttlSeconds) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.NEW_TXN);
        cmd.setNewTxn().setTcId(tcId).setRequestId(requestId).setTxnTtlSeconds(ttlSeconds);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newTxnResponse(long requestId, long txnIdLeastBits, long txnIdMostBits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.NEW_TXN_RESPONSE);
        cmd.setNewTxnResponse().setRequestId(requestId).setTxnidMostBits(txnIdMostBits).setTxnidLeastBits(txnIdLeastBits);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newTxnResponse(long requestId, long txnIdMostBits, ServerError error, String errorMsg) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.NEW_TXN_RESPONSE);
        CommandNewTxnResponse response = cmd.setNewTxnResponse().setRequestId(requestId).setTxnidMostBits(txnIdMostBits).setError(error);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAddPartitionToTxn(long requestId, long txnIdLeastBits, long txnIdMostBits, List<String> partitions) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ADD_PARTITION_TO_TXN);
        CommandAddPartitionToTxn req = cmd.setAddPartitionToTxn().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits);
        if (partitions != null) {
            partitions.forEach(req::addPartition);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAddPartitionToTxnResponse(long requestId, long txnIdLeastBits, long txnIdMostBits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ADD_PARTITION_TO_TXN_RESPONSE);
        cmd.setAddPartitionToTxnResponse().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAddPartitionToTxnResponse(long requestId, long txnIdMostBits, ServerError error, String errorMsg) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ADD_PARTITION_TO_TXN_RESPONSE);
        CommandAddPartitionToTxnResponse response = cmd.setAddPartitionToTxnResponse().setRequestId(requestId).setError(error).setTxnidMostBits(txnIdMostBits);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAddSubscriptionToTxn(long requestId, long txnIdLeastBits, long txnIdMostBits, List<Subscription> subscriptions) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN);
        CommandAddSubscriptionToTxn add = cmd.setAddSubscriptionToTxn().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits);
        subscriptions.forEach(s -> add.addSubscription().copyFrom((Subscription)s));
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdLeastBits, long txnIdMostBits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN_RESPONSE);
        cmd.setAddSubscriptionToTxnResponse().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newAddSubscriptionToTxnResponse(long requestId, long txnIdMostBits, ServerError error, String errorMsg) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.ADD_SUBSCRIPTION_TO_TXN_RESPONSE);
        CommandAddSubscriptionToTxnResponse response = cmd.setAddSubscriptionToTxnResponse().setRequestId(requestId).setTxnidMostBits(txnIdMostBits).setError(error);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static BaseCommand newEndTxn(long requestId, long txnIdLeastBits, long txnIdMostBits, TxnAction txnAction) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN);
        cmd.setEndTxn().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits).setTxnAction(txnAction);
        return cmd;
    }

    public static ByteBuf newEndTxnResponse(long requestId, long txnIdLeastBits, long txnIdMostBits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_RESPONSE);
        cmd.setEndTxnResponse().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newEndTxnResponse(long requestId, long txnIdMostBits, ServerError error, String errorMsg) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_RESPONSE);
        CommandEndTxnResponse response = cmd.setEndTxnResponse().setRequestId(requestId).setTxnidMostBits(txnIdMostBits).setError(error);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newEndTxnOnPartition(long requestId, long txnIdLeastBits, long txnIdMostBits, String topic, TxnAction txnAction, long lowWaterMark) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_ON_PARTITION);
        cmd.setEndTxnOnPartition().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits).setTopic(topic).setTxnAction(txnAction).setTxnidLeastBitsOfLowWatermark(lowWaterMark);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newEndTxnOnPartitionResponse(long requestId, long txnIdLeastBits, long txnIdMostBits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_ON_PARTITION_RESPONSE);
        cmd.setEndTxnOnPartitionResponse().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newEndTxnOnPartitionResponse(long requestId, ServerError error, String errorMsg, long txnIdLeastBits, long txnIdMostBits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_ON_PARTITION_RESPONSE);
        CommandEndTxnOnPartitionResponse response = cmd.setEndTxnOnPartitionResponse().setRequestId(requestId).setTxnidMostBits(txnIdMostBits).setTxnidLeastBits(txnIdLeastBits).setError(error);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newEndTxnOnSubscription(long requestId, long txnIdLeastBits, long txnIdMostBits, String topic, String subscription, TxnAction txnAction, long lowWaterMark) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_ON_SUBSCRIPTION);
        cmd.setEndTxnOnSubscription().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits).setTxnAction(txnAction).setTxnidLeastBitsOfLowWatermark(lowWaterMark).setSubscription().setTopic(topic).setSubscription(subscription);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newEndTxnOnSubscriptionResponse(long requestId, long txnIdLeastBits, long txnIdMostBits) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_ON_SUBSCRIPTION_RESPONSE);
        cmd.setEndTxnOnSubscriptionResponse().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits);
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf newEndTxnOnSubscriptionResponse(long requestId, long txnIdLeastBits, long txnIdMostBits, ServerError error, String errorMsg) {
        BaseCommand cmd = Commands.localCmd(BaseCommand.Type.END_TXN_ON_SUBSCRIPTION_RESPONSE);
        CommandEndTxnOnSubscriptionResponse response = cmd.setEndTxnOnSubscriptionResponse().setRequestId(requestId).setTxnidLeastBits(txnIdLeastBits).setTxnidMostBits(txnIdMostBits).setError(error);
        if (errorMsg != null) {
            response.setMessage(errorMsg);
        }
        return Commands.serializeWithSize(cmd);
    }

    public static ByteBuf serializeWithSize(BaseCommand cmd) {
        int cmdSize = cmd.getSerializedSize();
        int totalSize = cmdSize + 4;
        int frameSize = totalSize + 4;
        ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(frameSize, frameSize);
        buf.writeInt(totalSize);
        buf.writeInt(cmdSize);
        cmd.writeTo(buf);
        return buf;
    }

    private static ByteBufPair serializeCommandSendWithSize(BaseCommand cmd, ChecksumType checksumType, MessageMetadata msgMetadata, ByteBuf payload) {
        int cmdSize = cmd.getSerializedSize();
        int msgMetadataSize = msgMetadata.getSerializedSize();
        int payloadSize = payload.readableBytes();
        int magicAndChecksumLength = ChecksumType.Crc32c.equals((Object)checksumType) ? 6 : 0;
        boolean includeChecksum = magicAndChecksumLength > 0;
        int headerContentSize = 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize;
        int totalSize = headerContentSize + payloadSize;
        int headersSize = 4 + headerContentSize;
        int checksumReaderIndex = -1;
        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
        headers.writeInt(totalSize);
        headers.writeInt(cmdSize);
        cmd.writeTo(headers);
        if (includeChecksum) {
            headers.writeShort(3585);
            checksumReaderIndex = headers.writerIndex();
            headers.writerIndex(headers.writerIndex() + 4);
        }
        headers.writeInt(msgMetadataSize);
        msgMetadata.writeTo(headers);
        ByteBufPair command = ByteBufPair.get(headers, payload);
        if (includeChecksum) {
            headers.markReaderIndex();
            headers.readerIndex(checksumReaderIndex + 4);
            int metadataChecksum = Crc32cIntChecksum.computeChecksum(headers);
            int computedChecksum = Crc32cIntChecksum.resumeChecksum(metadataChecksum, payload);
            headers.setInt(checksumReaderIndex, computedChecksum);
            headers.resetReaderIndex();
        }
        return command;
    }

    public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, Set<BrokerEntryMetadataInterceptor> interceptors) {
        return Commands.addBrokerEntryMetadata(headerAndPayload, interceptors, -1);
    }

    public static ByteBuf addBrokerEntryMetadata(ByteBuf headerAndPayload, Set<BrokerEntryMetadataInterceptor> brokerInterceptors, int numberOfMessages) {
        BrokerEntryMetadata brokerEntryMetadata = BROKER_ENTRY_METADATA.get();
        for (BrokerEntryMetadataInterceptor interceptor : brokerInterceptors) {
            interceptor.intercept(brokerEntryMetadata);
            if (numberOfMessages < 0) continue;
            interceptor.interceptWithNumberOfMessages(brokerEntryMetadata, numberOfMessages);
        }
        int brokerMetaSize = brokerEntryMetadata.getSerializedSize();
        ByteBuf brokerMeta = PulsarByteBufAllocator.DEFAULT.buffer(brokerMetaSize + 6, brokerMetaSize + 6);
        brokerMeta.writeShort(3586);
        brokerMeta.writeInt(brokerMetaSize);
        brokerEntryMetadata.writeTo(brokerMeta);
        CompositeByteBuf compositeByteBuf = PulsarByteBufAllocator.DEFAULT.compositeBuffer();
        compositeByteBuf.addComponents(true, brokerMeta, headerAndPayload);
        return compositeByteBuf;
    }

    public static ByteBuf skipBrokerEntryMetadataIfExist(ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
        int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex();
        if (headerAndPayloadWithBrokerEntryMetadata.readShort() == 3586) {
            int brokerEntryMetadataSize = headerAndPayloadWithBrokerEntryMetadata.readInt();
            headerAndPayloadWithBrokerEntryMetadata.readerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex() + brokerEntryMetadataSize);
        } else {
            headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
        }
        return headerAndPayloadWithBrokerEntryMetadata;
    }

    public static BrokerEntryMetadata parseBrokerEntryMetadataIfExist(ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
        int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex();
        if (headerAndPayloadWithBrokerEntryMetadata.getShort(readerIndex) == 3586) {
            headerAndPayloadWithBrokerEntryMetadata.skipBytes(2);
            int brokerEntryMetadataSize = headerAndPayloadWithBrokerEntryMetadata.readInt();
            BrokerEntryMetadata brokerEntryMetadata = new BrokerEntryMetadata();
            brokerEntryMetadata.parseFrom(headerAndPayloadWithBrokerEntryMetadata, brokerEntryMetadataSize);
            return brokerEntryMetadata;
        }
        return null;
    }

    public static BrokerEntryMetadata peekBrokerEntryMetadataIfExist(ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
        int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex();
        BrokerEntryMetadata entryMetadata = Commands.parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata);
        headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
        return entryMetadata;
    }

    public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType, MessageMetadata msgMetadata, ByteBuf payload) {
        int msgMetadataSize = msgMetadata.getSerializedSize();
        int payloadSize = payload.readableBytes();
        int magicAndChecksumLength = ChecksumType.Crc32c.equals((Object)checksumType) ? 6 : 0;
        boolean includeChecksum = magicAndChecksumLength > 0;
        int headerContentSize = magicAndChecksumLength + 4 + msgMetadataSize;
        int checksumReaderIndex = -1;
        int totalSize = headerContentSize + payloadSize;
        ByteBuf metadataAndPayload = PulsarByteBufAllocator.DEFAULT.buffer(totalSize, totalSize);
        if (includeChecksum) {
            metadataAndPayload.writeShort(3585);
            checksumReaderIndex = metadataAndPayload.writerIndex();
            metadataAndPayload.writerIndex(metadataAndPayload.writerIndex() + 4);
        }
        metadataAndPayload.writeInt(msgMetadataSize);
        msgMetadata.writeTo(metadataAndPayload);
        if (includeChecksum) {
            metadataAndPayload.markReaderIndex();
            metadataAndPayload.readerIndex(checksumReaderIndex + 4);
            int metadataChecksum = Crc32cIntChecksum.computeChecksum(metadataAndPayload);
            int computedChecksum = Crc32cIntChecksum.resumeChecksum(metadataChecksum, payload);
            metadataAndPayload.setInt(checksumReaderIndex, computedChecksum);
            metadataAndPayload.resetReaderIndex();
        }
        metadataAndPayload.writeBytes(payload);
        return metadataAndPayload;
    }

    public static long initBatchMessageMetadata(MessageMetadata messageMetadata, MessageMetadata builder) {
        messageMetadata.setPublishTime(builder.getPublishTime());
        messageMetadata.setProducerName(builder.getProducerName());
        messageMetadata.setSequenceId(builder.getSequenceId());
        if (builder.hasPartitionKey()) {
            messageMetadata.setPartitionKey(builder.getPartitionKey());
            messageMetadata.setPartitionKeyB64Encoded(builder.isPartitionKeyB64Encoded());
        }
        if (builder.hasOrderingKey()) {
            messageMetadata.setOrderingKey(builder.getOrderingKey());
        }
        if (builder.hasReplicatedFrom()) {
            messageMetadata.setReplicatedFrom(builder.getReplicatedFrom());
        }
        if (builder.getReplicateTosCount() > 0) {
            for (int i = 0; i < builder.getReplicateTosCount(); ++i) {
                messageMetadata.addReplicateTo(builder.getReplicateToAt(i));
            }
        }
        if (builder.hasSchemaVersion()) {
            messageMetadata.setSchemaVersion(builder.getSchemaVersion());
        }
        return builder.getSequenceId();
    }

    public static ByteBuf serializeSingleMessageInBatchWithPayload(SingleMessageMetadata singleMessageMetadata, ByteBuf payload, ByteBuf batchBuffer) {
        singleMessageMetadata.setPayloadSize(payload.readableBytes());
        batchBuffer.writeInt(singleMessageMetadata.getSerializedSize());
        singleMessageMetadata.writeTo(batchBuffer);
        return batchBuffer.writeBytes(payload);
    }

    public static ByteBuf serializeSingleMessageInBatchWithPayload(MessageMetadata msg, ByteBuf payload, ByteBuf batchBuffer) {
        SingleMessageMetadata smm = LOCAL_SINGLE_MESSAGE_METADATA.get();
        smm.clear();
        if (msg.hasPartitionKey()) {
            smm.setPartitionKey(msg.getPartitionKey());
            smm.setPartitionKeyB64Encoded(msg.isPartitionKeyB64Encoded());
        }
        if (msg.hasOrderingKey()) {
            smm.setOrderingKey(msg.getOrderingKey());
        }
        for (int i = 0; i < msg.getPropertiesCount(); ++i) {
            smm.addProperty().setKey(msg.getPropertyAt(i).getKey()).setValue(msg.getPropertyAt(i).getValue());
        }
        if (msg.hasEventTime()) {
            smm.setEventTime(msg.getEventTime());
        }
        if (msg.hasSequenceId()) {
            smm.setSequenceId(msg.getSequenceId());
        }
        if (msg.hasNullValue()) {
            smm.setNullValue(msg.isNullValue());
        }
        if (msg.hasNullPartitionKey()) {
            smm.setNullPartitionKey(msg.isNullPartitionKey());
        }
        return Commands.serializeSingleMessageInBatchWithPayload(smm, payload, batchBuffer);
    }

    public static ByteBuf deSerializeSingleMessageInBatch(ByteBuf uncompressedPayload, SingleMessageMetadata singleMessageMetadata, int index, int batchSize) throws IOException {
        int singleMetaSize = (int)uncompressedPayload.readUnsignedInt();
        singleMessageMetadata.parseFrom(uncompressedPayload, singleMetaSize);
        int singleMessagePayloadSize = singleMessageMetadata.getPayloadSize();
        int readerIndex = uncompressedPayload.readerIndex();
        ByteBuf singleMessagePayload = uncompressedPayload.retainedSlice(readerIndex, singleMessagePayloadSize);
        if (index < batchSize) {
            uncompressedPayload.readerIndex(readerIndex + singleMessagePayloadSize);
        }
        return singleMessagePayload;
    }

    public static ByteBufPair serializeCommandMessageWithSize(BaseCommand cmd, ByteBuf metadataAndPayload) {
        int cmdSize = cmd.getSerializedSize();
        int totalSize = 4 + cmdSize + metadataAndPayload.readableBytes();
        int headersSize = 8 + cmdSize;
        ByteBuf headers = PulsarByteBufAllocator.DEFAULT.buffer(headersSize);
        headers.writeInt(totalSize);
        headers.writeInt(cmdSize);
        cmd.writeTo(headers);
        return ByteBufPair.get(headers, metadataAndPayload);
    }

    public static int getNumberOfMessagesInBatch(ByteBuf metadataAndPayload, String subscription, long consumerId) {
        MessageMetadata msgMetadata = Commands.peekMessageMetadata(metadataAndPayload, subscription, consumerId);
        if (msgMetadata == null) {
            return -1;
        }
        return msgMetadata.getNumMessagesInBatch();
    }

    public static MessageMetadata peekMessageMetadata(ByteBuf metadataAndPayload, String subscription, long consumerId) {
        try {
            int readerIdx = metadataAndPayload.readerIndex();
            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
            metadataAndPayload.readerIndex(readerIdx);
            return metadata;
        }
        catch (Throwable t) {
            log.error("[{}] [{}] Failed to parse message metadata", new Object[]{subscription, consumerId, t});
            return null;
        }
    }

    public static byte[] peekStickyKey(ByteBuf metadataAndPayload, String topic, String subscription) {
        try {
            int readerIdx = metadataAndPayload.readerIndex();
            MessageMetadata metadata = Commands.parseMessageMetadata(metadataAndPayload);
            metadataAndPayload.readerIndex(readerIdx);
            if (metadata.hasOrderingKey()) {
                return metadata.getOrderingKey();
            }
            if (metadata.hasPartitionKey()) {
                if (metadata.isPartitionKeyB64Encoded()) {
                    return Base64.getDecoder().decode(metadata.getPartitionKey());
                }
                return metadata.getPartitionKey().getBytes(StandardCharsets.UTF_8);
            }
        }
        catch (Throwable t) {
            log.error("[{}] [{}] Failed to peek sticky key from the message metadata", new Object[]{topic, subscription, t});
        }
        return NONE_KEY;
    }

    public static int getCurrentProtocolVersion() {
        return CURRENT_PROTOCOL_VERSION;
    }

    public static boolean peerSupportsGetLastMessageId(int peerVersion) {
        return peerVersion >= ProtocolVersion.v12.getValue();
    }

    public static boolean peerSupportsActiveConsumerListener(int peerVersion) {
        return peerVersion >= ProtocolVersion.v12.getValue();
    }

    public static boolean peerSupportsMultiMessageAcknowledgment(int peerVersion) {
        return peerVersion >= ProtocolVersion.v12.getValue();
    }

    public static boolean peerSupportJsonSchemaAvroFormat(int peerVersion) {
        return peerVersion >= ProtocolVersion.v13.getValue();
    }

    public static boolean peerSupportsGetOrCreateSchema(int peerVersion) {
        return peerVersion >= ProtocolVersion.v15.getValue();
    }

    public static boolean peerSupportsAckReceipt(int peerVersion) {
        return peerVersion >= ProtocolVersion.v17.getValue();
    }

    private static org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProducerAccessMode convertProducerAccessMode(ProducerAccessMode accessMode) {
        switch (accessMode) {
            case Exclusive: {
                return org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProducerAccessMode.Exclusive;
            }
            case Shared: {
                return org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProducerAccessMode.Shared;
            }
            case WaitForExclusive: {
                return org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProducerAccessMode.WaitForExclusive;
            }
        }
        throw new IllegalArgumentException("Unknonw access mode: " + (Object)((Object)accessMode));
    }

    public static ProducerAccessMode convertProducerAccessMode(org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ProducerAccessMode accessMode) {
        switch (accessMode) {
            case Exclusive: {
                return ProducerAccessMode.Exclusive;
            }
            case Shared: {
                return ProducerAccessMode.Shared;
            }
            case WaitForExclusive: {
                return ProducerAccessMode.WaitForExclusive;
            }
        }
        throw new IllegalArgumentException("Unknonw access mode: " + (Object)((Object)accessMode));
    }

    public static boolean peerSupportsBrokerMetadata(int peerVersion) {
        return peerVersion >= ProtocolVersion.v16.getValue();
    }

    private Commands() {
        throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
    }

    static {
        BaseCommand cmd = new BaseCommand().setType(BaseCommand.Type.PING);
        cmd.setPing();
        ByteBuf serializedCmdPing = Commands.serializeWithSize(cmd);
        cmdPing = Unpooled.copiedBuffer(serializedCmdPing);
        serializedCmdPing.release();
        cmd = new BaseCommand().setType(BaseCommand.Type.PONG);
        cmd.setPong();
        ByteBuf serializedCmdPong = Commands.serializeWithSize(cmd);
        cmdPong = Unpooled.copiedBuffer(serializedCmdPong);
        serializedCmdPong.release();
        NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
    }

    public static enum ChecksumType {
        Crc32c,
        None;

    }
}

