package org.apache.tubemq.server.broker;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.binary.StringUtils;
import org.apache.tubemq.corebase.Message;
import org.apache.tubemq.corebase.cluster.Partition;
import org.apache.tubemq.corebase.config.TLSConfig;
import org.apache.tubemq.corebase.protobuf.generated.ClientBroker;
import org.apache.tubemq.corebase.utils.AddressUtils;
import org.apache.tubemq.corebase.utils.CheckSum;
import org.apache.tubemq.corebase.utils.DataConverterUtil;
import org.apache.tubemq.corebase.utils.ServiceStatusHolder;
import org.apache.tubemq.corebase.utils.TStringUtils;
import org.apache.tubemq.corerpc.RpcConfig;
import org.apache.tubemq.corerpc.service.BrokerReadService;
import org.apache.tubemq.corerpc.service.BrokerWriteService;
import org.apache.tubemq.server.Server;
import org.apache.tubemq.server.broker.metadata.MetadataManager;
import org.apache.tubemq.server.broker.msgstore.MessageStore;
import org.apache.tubemq.server.broker.msgstore.MessageStoreManager;
import org.apache.tubemq.server.broker.msgstore.disk.GetMessageResult;
import org.apache.tubemq.server.broker.nodeinfo.ConsumerNodeInfo;
import org.apache.tubemq.server.broker.offset.OffsetService;
import org.apache.tubemq.server.broker.stats.CountService;
import org.apache.tubemq.server.broker.stats.GroupCountService;
import org.apache.tubemq.server.common.TServerConstants;
import org.apache.tubemq.server.common.aaaserver.CertificateBrokerHandler;
import org.apache.tubemq.server.common.aaaserver.CertifiedResult;
import org.apache.tubemq.server.common.exception.HeartbeatException;
import org.apache.tubemq.server.common.heartbeat.HeartbeatManager;
import org.apache.tubemq.server.common.heartbeat.TimeoutInfo;
import org.apache.tubemq.server.common.heartbeat.TimeoutListener;
import org.apache.tubemq.server.common.offsetstorage.OffsetStorageInfo;
import org.apache.tubemq.server.common.paramcheck.PBParameterUtils;
import org.apache.tubemq.server.common.paramcheck.ParamCheckResult;
import org.apache.tubemq.server.common.utils.AppendResult;
import org.apache.tubemq.server.common.utils.RowLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/tubemq/server/broker/BrokerServiceServer.class */
public class BrokerServiceServer implements BrokerReadService, BrokerWriteService, Server {
    private static final Logger logger = LoggerFactory.getLogger(BrokerServiceServer.class);
    private final TubeBroker tubeBroker;
    private final BrokerConfig tubeConfig;
    private final MetadataManager metadataManager;
    private final OffsetService offsetManager;
    private final MessageStoreManager storeManager;
    private final HeartbeatManager heartbeatManager;
    private final RowLock brokerRowLock;
    private final CountService putCounterGroup;
    private final CountService getCounterGroup;
    private final CertificateBrokerHandler serverAuthHandler;
    private final ConcurrentHashMap<String, ConsumerNodeInfo> consumerRegisterMap = new ConcurrentHashMap<>();
    private final ConsumerTimeoutListener consumerListener = new ConsumerTimeoutListener();
    private AtomicBoolean started = new AtomicBoolean(false);

    /* loaded from: input_file:org/apache/tubemq/server/broker/BrokerServiceServer$ConsumerTimeoutListener.class */
    public class ConsumerTimeoutListener implements TimeoutListener {
        public ConsumerTimeoutListener() {
        }

        @Override // org.apache.tubemq.server.common.heartbeat.TimeoutListener
        public void onTimeout(String str, TimeoutInfo timeoutInfo) {
            ConsumerNodeInfo consumerNodeInfo;
            StringBuilder sb = new StringBuilder(512);
            try {
                try {
                    Integer lock = BrokerServiceServer.this.brokerRowLock.getLock((Integer) null, StringUtils.getBytesUtf8(timeoutInfo.getSecondKey()), true);
                    Integer num = null;
                    try {
                        try {
                            num = BrokerServiceServer.this.brokerRowLock.getLock((Integer) null, StringUtils.getBytesUtf8(timeoutInfo.getThirdKey()), true);
                            consumerNodeInfo = (ConsumerNodeInfo) BrokerServiceServer.this.consumerRegisterMap.get(timeoutInfo.getThirdKey());
                        } finally {
                            if (0 != 0) {
                                BrokerServiceServer.this.brokerRowLock.releaseRowLock(null);
                            }
                        }
                    } catch (IOException e) {
                        BrokerServiceServer.logger.warn("Failed to lock.", e);
                        if (num != null) {
                            BrokerServiceServer.this.brokerRowLock.releaseRowLock(num);
                        }
                    }
                    if (consumerNodeInfo == null) {
                        if (lock != null) {
                            BrokerServiceServer.this.brokerRowLock.releaseRowLock(lock);
                            return;
                        }
                        return;
                    }
                    if (consumerNodeInfo.getConsumerId().equalsIgnoreCase(timeoutInfo.getSecondKey())) {
                        BrokerServiceServer.this.consumerRegisterMap.remove(timeoutInfo.getThirdKey());
                        String[] split = consumerNodeInfo.getPartStr().split(":");
                        BrokerServiceServer.logger.info(sb.append("[Consumer-Partition Timeout]").append(str).append(",updatedOffset=").append(BrokerServiceServer.this.offsetManager.commitOffset(split[0], split[1], Integer.parseInt(split[2]), false)).toString());
                    }
                    if (num != null) {
                        BrokerServiceServer.this.brokerRowLock.releaseRowLock(num);
                    }
                    if (lock != null) {
                        BrokerServiceServer.this.brokerRowLock.releaseRowLock(lock);
                    }
                } catch (IOException e2) {
                    BrokerServiceServer.logger.warn("Failed to lock.", e2);
                    if (0 != 0) {
                        BrokerServiceServer.this.brokerRowLock.releaseRowLock(null);
                    }
                }
            } catch (Throwable th) {
                if (0 != 0) {
                    BrokerServiceServer.this.brokerRowLock.releaseRowLock(null);
                }
                throw th;
            }
        }
    }

    public BrokerServiceServer(TubeBroker tubeBroker, BrokerConfig brokerConfig) {
        this.tubeConfig = brokerConfig;
        this.tubeBroker = tubeBroker;
        this.metadataManager = tubeBroker.getMetadataManager();
        this.storeManager = tubeBroker.getStoreManager();
        this.offsetManager = tubeBroker.getOffsetManager();
        this.serverAuthHandler = tubeBroker.getServerAuthHandler();
        ServiceStatusHolder.setStatisParameters(brokerConfig.getAllowedReadIOExcptCnt(), brokerConfig.getAllowedWriteIOExcptCnt(), brokerConfig.getIoExcptStatsDurationMs());
        this.putCounterGroup = new GroupCountService("PutCounterGroup", "Producer", 60000L);
        this.getCounterGroup = new GroupCountService("GetCounterGroup", "Consumer", 60000L);
        this.heartbeatManager = new HeartbeatManager();
        this.brokerRowLock = new RowLock("Broker-RowLock", this.tubeConfig.getRowLockWaitDurMs());
        this.heartbeatManager.regConsumerCheckBusiness(this.tubeConfig.getConsumerRegTimeoutMs(), this.consumerListener);
    }

    @Override // org.apache.tubemq.server.Server
    public void start() throws Exception {
        RpcConfig rpcConfig = new RpcConfig();
        rpcConfig.put("rpc.netty.send.buffer", Long.valueOf(this.tubeConfig.getSocketSendBuffer()));
        rpcConfig.put("rpc.netty.receive.buffer", Long.valueOf(this.tubeConfig.getSocketRecvBuffer()));
        rpcConfig.put("rpc.netty.worker.count", Integer.valueOf(this.tubeConfig.getTcpWriteServiceThread()));
        this.tubeBroker.getRpcServiceFactory().publishService(BrokerWriteService.class, this, this.tubeConfig.getPort(), rpcConfig);
        RpcConfig rpcConfig2 = new RpcConfig();
        rpcConfig2.put("rpc.netty.send.buffer", Long.valueOf(this.tubeConfig.getSocketSendBuffer()));
        rpcConfig2.put("rpc.netty.receive.buffer", Long.valueOf(this.tubeConfig.getSocketRecvBuffer()));
        rpcConfig2.put("rpc.netty.worker.count", Integer.valueOf(this.tubeConfig.getTcpReadServiceThread()));
        this.tubeBroker.getRpcServiceFactory().publishService(BrokerReadService.class, this, this.tubeConfig.getPort(), rpcConfig2);
        if (this.tubeConfig.isTlsEnable()) {
            TLSConfig tlsConfig = this.tubeConfig.getTlsConfig();
            RpcConfig rpcConfig3 = new RpcConfig();
            rpcConfig3.put("tcp.tls", true);
            rpcConfig3.put("rpc.netty.send.buffer", Long.valueOf(this.tubeConfig.getSocketSendBuffer()));
            rpcConfig3.put("rpc.netty.receive.buffer", Long.valueOf(this.tubeConfig.getSocketRecvBuffer()));
            rpcConfig3.put("rpc.netty.worker.count", Integer.valueOf(this.tubeConfig.getTlsWriteServiceThread()));
            rpcConfig3.put("tls.keystore.path", tlsConfig.getTlsKeyStorePath());
            rpcConfig3.put("tls.keystore.password", tlsConfig.getTlsKeyStorePassword());
            rpcConfig3.put("tls.twoway.authentic", Boolean.valueOf(tlsConfig.isTlsTwoWayAuthEnable()));
            if (tlsConfig.isTlsTwoWayAuthEnable()) {
                rpcConfig3.put("tls.truststore.path", tlsConfig.getTlsTrustStorePath());
                rpcConfig3.put("tls.truststore.password", tlsConfig.getTlsTrustStorePassword());
            }
            this.tubeBroker.getRpcServiceFactory().publishService(BrokerWriteService.class, this, this.tubeConfig.getTlsPort(), rpcConfig3);
            RpcConfig rpcConfig4 = new RpcConfig();
            rpcConfig4.put("rpc.netty.worker.count", Integer.valueOf(this.tubeConfig.getTlsReadServiceThread()));
            rpcConfig4.put("tcp.tls", true);
            rpcConfig4.put("rpc.netty.send.buffer", Long.valueOf(this.tubeConfig.getSocketSendBuffer()));
            rpcConfig4.put("rpc.netty.receive.buffer", Long.valueOf(this.tubeConfig.getSocketRecvBuffer()));
            rpcConfig4.put("tls.keystore.path", tlsConfig.getTlsKeyStorePath());
            rpcConfig4.put("tls.keystore.password", tlsConfig.getTlsKeyStorePassword());
            rpcConfig4.put("tls.twoway.authentic", Boolean.valueOf(tlsConfig.isTlsTwoWayAuthEnable()));
            if (tlsConfig.isTlsTwoWayAuthEnable()) {
                rpcConfig4.put("tls.truststore.path", tlsConfig.getTlsTrustStorePath());
                rpcConfig4.put("tls.truststore.password", tlsConfig.getTlsTrustStorePassword());
            }
            this.tubeBroker.getRpcServiceFactory().publishService(BrokerReadService.class, this, this.tubeConfig.getTlsPort(), rpcConfig4);
        }
        this.started.set(true);
    }

    @Override // org.apache.tubemq.server.Server
    public void stop() {
        if (this.started.compareAndSet(true, false)) {
            this.heartbeatManager.stop();
            this.putCounterGroup.close(-1L);
            this.getCounterGroup.close(-1L);
            logger.info("BrokerService server stopped");
        }
    }

    public Map<String, ConsumerNodeInfo> getConsumerRegisterMap() {
        return this.consumerRegisterMap;
    }

    public ConsumerNodeInfo getConsumerNodeInfo(String str) {
        return this.consumerRegisterMap.get(str);
    }

    public Long getConsumerRegisterTime(String str, String str2) {
        TimeoutInfo timeoutInfo = this.heartbeatManager.getConsumerRegMap().get(getHeartbeatNodeId(str, str2));
        if (timeoutInfo == null) {
            return null;
        }
        return Long.valueOf(timeoutInfo.getTimeoutTime() - this.heartbeatManager.getConsumerTimeoutDlt());
    }

    public ClientBroker.GetMessageResponseB2C getMessagesC2B(ClientBroker.GetMessageRequestC2B getMessageRequestC2B, String str, boolean z) throws Throwable {
        ClientBroker.GetMessageResponseB2C.Builder newBuilder = ClientBroker.GetMessageResponseB2C.newBuilder();
        newBuilder.setSuccess(false);
        newBuilder.setCurrOffset(-1L);
        newBuilder.setEscFlowCtrl(false);
        newBuilder.setCurrDataDlt(-1L);
        newBuilder.setMinLimitTime(0);
        StringBuilder sb = new StringBuilder(512);
        if (!this.started.get() || ServiceStatusHolder.isReadServiceStop()) {
            newBuilder.setErrCode(503);
            newBuilder.setErrMsg("Read StoreService temporary unavailable!");
            return newBuilder.build();
        }
        ParamCheckResult checkClientId = PBParameterUtils.checkClientId(getMessageRequestC2B.getClientId(), sb);
        if (!checkClientId.result) {
            newBuilder.setErrCode(checkClientId.errCode);
            newBuilder.setErrMsg(checkClientId.errMsg);
            return newBuilder.build();
        }
        String str2 = (String) checkClientId.checkData;
        ParamCheckResult checkGroupName = PBParameterUtils.checkGroupName(getMessageRequestC2B.getGroupName(), sb);
        if (!checkGroupName.result) {
            newBuilder.setErrCode(checkGroupName.errCode);
            newBuilder.setErrMsg(checkGroupName.errMsg);
            return newBuilder.build();
        }
        String str3 = (String) checkGroupName.checkData;
        ParamCheckResult checkConsumeTopicName = PBParameterUtils.checkConsumeTopicName(getMessageRequestC2B.getTopicName(), this.metadataManager, sb);
        if (!checkConsumeTopicName.result) {
            newBuilder.setErrCode(checkConsumeTopicName.errCode);
            newBuilder.setErrMsg(checkConsumeTopicName.errMsg);
            return newBuilder.build();
        }
        String str4 = (String) checkConsumeTopicName.checkData;
        int partitionId = getMessageRequestC2B.getPartitionId();
        boolean z2 = getMessageRequestC2B.hasEscFlowCtrl() && getMessageRequestC2B.getEscFlowCtrl();
        String partStr = getPartStr(str3, str4, partitionId);
        String str5 = null;
        ConsumerNodeInfo consumerNodeInfo = this.consumerRegisterMap.get(partStr);
        if (consumerNodeInfo != null) {
            str5 = consumerNodeInfo.getConsumerId();
        }
        if (str5 == null) {
            logger.warn(sb.append("[UnRegistered Consumer]").append(str2).append("#").append(partStr).toString());
            sb.delete(0, sb.length());
            newBuilder.setErrCode(411);
            newBuilder.setErrMsg(sb.append("UnRegistered Consumer:").append(str2).append(", you have to register firstly!").toString());
            return newBuilder.build();
        }
        if (!str2.equals(str5)) {
            sb.append("[Duplicated Request] Partition=").append(partStr).append(" of Broker=").append(this.tubeConfig.getBrokerId()).append(" has been consumed by ").append(str5).append(";Current consumer ").append(str2);
            logger.warn(sb.toString());
            newBuilder.setErrCode(412);
            newBuilder.setErrMsg(sb.toString());
            return newBuilder.build();
        }
        String rmtAddrInfo = consumerNodeInfo.getRmtAddrInfo();
        try {
            this.heartbeatManager.updConsumerNode(getHeartbeatNodeId(str2, partStr));
            Integer closedTopicStatusId = this.metadataManager.getClosedTopicStatusId(str4);
            if (closedTopicStatusId != null && closedTopicStatusId.intValue() > 1) {
                sb.append("[Partition Closed] Partition has been closed, for topic=").append(str4).append(",partitionId=").append(partitionId).append(" of Broker=").append(this.tubeConfig.getBrokerId());
                logger.warn(sb.toString());
                newBuilder.setErrCode(403);
                newBuilder.setErrMsg(sb.toString());
                return newBuilder.build();
            }
            try {
                GetMessageResult messages = getMessages(this.storeManager.getOrCreateMessageStore(str4, partitionId), consumerNodeInfo, str3, str4, partitionId, getMessageRequestC2B.getLastPackConsumed(), getMessageRequestC2B.getManualCommitOffset(), str2, this.tubeConfig.getHostName(), rmtAddrInfo, z2, sb);
                if (!messages.isSuccess) {
                    newBuilder.setErrCode(messages.getRetCode());
                    newBuilder.setErrMsg(messages.errInfo);
                    newBuilder.setMinLimitTime((int) messages.waitTime);
                    return newBuilder.build();
                }
                consumerNodeInfo.setLastProcInfo(System.currentTimeMillis(), messages.lastRdDataOffset, messages.totalMsgSize);
                this.getCounterGroup.add(messages.tmpCounters);
                newBuilder.setEscFlowCtrl(false);
                newBuilder.setRequireSlow(messages.isSlowFreq);
                newBuilder.setSuccess(true);
                newBuilder.setErrCode(200);
                newBuilder.setCurrOffset(messages.reqOffset);
                newBuilder.setCurrDataDlt(messages.waitTime);
                newBuilder.setErrMsg("OK!");
                newBuilder.addAllMessages(messages.transferedMessageList);
                newBuilder.setMaxOffset(messages.getMaxOffset());
                return newBuilder.build();
            } catch (Throwable th) {
                sb.delete(0, sb.length());
                newBuilder.setErrCode(500);
                if (0 != 0) {
                    sb.append("[GetMessage] Throwable error while getMessage,").append(th.getMessage()).append(", position is").append(this.tubeConfig.getBrokerId()).append(":").append(str4).append(":").append(partitionId);
                    logger.error(sb.toString(), th);
                    newBuilder.setErrMsg(th.getMessage() == null ? sb.toString() : th.getMessage());
                } else {
                    newBuilder.setErrMsg(sb.append("Get the store of topic ").append(str4).append(" in partition ").append(partitionId).append(" failure!").toString());
                }
                return newBuilder.build();
            }
        } catch (HeartbeatException e) {
            logger.warn(sb.append("[Invalid Request]").append(str2).append("#").append(str4).append(":").append(partitionId).toString());
            newBuilder.setErrCode(411);
            newBuilder.setErrMsg(e.getMessage());
            return newBuilder.build();
        }
    }

    private GetMessageResult getMessages(MessageStore messageStore, ConsumerNodeInfo consumerNodeInfo, String str, String str2, int i, boolean z, boolean z2, String str3, String str4, String str5, boolean z3, StringBuilder sb) throws IOException {
        long offset = this.offsetManager.getOffset(messageStore, str, str2, i, z2, z, sb);
        if (offset < 0) {
            return new GetMessageResult(false, 404, -offset, 0, "The request offset reached maxOffset!");
        }
        long dataMaxOffset = messageStore.getDataMaxOffset();
        int realQryPriorityId = getRealQryPriorityId(consumerNodeInfo);
        int currentAllowedSize = consumerNodeInfo.getCurrentAllowedSize(messageStore.getStoreKey(), this.metadataManager.getFlowCtrlRuleHandler(), dataMaxOffset, this.storeManager.getMaxMsgTransferSize(), z3);
        if (currentAllowedSize <= 0) {
            return consumerNodeInfo.isSupportLimit() ? new GetMessageResult(false, 452, offset, 0, -currentAllowedSize, "RpcServer consume speed limit!") : new GetMessageResult(false, 404, offset, 0, "RpcServer consume speed limit!");
        }
        try {
            String sb2 = sb.append(str2).append("#").append(str4).append("#").append(str3).append("#").append(str5).append("#").append(str).append("#").append(i).toString();
            sb.delete(0, sb.length());
            GetMessageResult messages = messageStore.getMessages(realQryPriorityId, offset, i, consumerNodeInfo, sb2, currentAllowedSize);
            this.offsetManager.bookOffset(str, str2, i, messages.lastReadOffset, z2, messages.transferedMessageList.isEmpty(), sb);
            messages.setWaitTime(dataMaxOffset - messages.lastRdDataOffset);
            return messages;
        } catch (Throwable th) {
            sb.delete(0, sb.length());
            logger.warn(sb.append("[Store Manager] get message failure, requestOffset=").append(offset).append(",group=").append(str).append(",topic=").append(str2).append(",partitionId=").append(i).toString(), th);
            sb.delete(0, sb.length());
            return new GetMessageResult(false, 500, offset, 0, sb.append("Get message failure, errMsg=").append(th.getMessage()).toString());
        }
    }

    public StringBuilder getMessageSnapshot(String str, int i, int i2, Set<String> set, StringBuilder sb) throws Exception {
        MessageStore messageStore = null;
        if (!this.started.get() || ServiceStatusHolder.isReadServiceStop()) {
            sb.append("{\"result\":false,\"errCode\":").append(503).append(",\"errMsg\":\"Read StoreService temporary unavailable!\"}");
            return sb;
        }
        try {
            if (i == -1) {
                Collection<MessageStore> messageStoresByTopic = this.storeManager.getMessageStoresByTopic(str);
                if (messageStoresByTopic == null || messageStoresByTopic.isEmpty()) {
                    sb.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"").append("Invalid parameter: not found the store by topicName(").append(str).append(")!\"}");
                    return sb;
                }
                Iterator<MessageStore> it = messageStoresByTopic.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    MessageStore next = it.next();
                    messageStore = next;
                    if (messageStore != null) {
                        i = next.getStoreId() * 10000;
                        break;
                    }
                }
                if (messageStore == null) {
                    sb.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"").append("Invalid parameter: all store is null by topicName(").append(str).append(")!\"}");
                    return sb;
                }
            } else {
                messageStore = this.storeManager.getOrCreateMessageStore(str, i);
                if (messageStore == null) {
                    sb.append("{\"result\":false,\"errCode\":400,\"errMsg\":\"").append("Invalid parameter: not found the store by topicName + partitionId(").append(str).append(":").append(i).append(")!\"}");
                    return sb;
                }
            }
            GetMessageResult messages = this.storeManager.getMessages(messageStore, str, i, i2, set);
            if (messages.transferedMessageList == null || messages.transferedMessageList.isEmpty()) {
                sb.append("{\"result\":false,\"errCode\":401,\"errMsg\":\"").append("Could not find message at position by topic (").append(str).append(")!\"}");
                return sb;
            }
            ArrayList<String> arrayList = new ArrayList();
            List convertMessage = DataConverterUtil.convertMessage(str, messages.transferedMessageList);
            for (int size = convertMessage.size() - i2 < 0 ? 0 : convertMessage.size() - i2; size < convertMessage.size(); size++) {
                arrayList.add(new String(Base64.encodeBase64(((Message) convertMessage.get(size)).getData())));
            }
            int i3 = 0;
            sb.append("{\"result\":true,\"errCode\":200,\"errMsg\":\"Success!\",\"dataSet\":[");
            for (String str2 : arrayList) {
                if (i3 > 0) {
                    sb.append(",");
                }
                int i4 = i3;
                i3++;
                sb.append("{\"index\":").append(i4).append(",\"data\":\"").append(str2).append("\"}");
            }
            sb.append("]}");
            return sb;
        } catch (Throwable th) {
            sb.append("{\"result\":false,\"errCode\":501,\"errMsg\":\"Get Message failure, exception is ").append(th.getMessage()).append("\"}");
            return sb;
        }
    }

    public ClientBroker.SendMessageResponseB2P sendMessageP2B(ClientBroker.SendMessageRequestP2B sendMessageRequestP2B, String str, boolean z) throws Throwable {
        StringBuilder sb = new StringBuilder(512);
        ClientBroker.SendMessageResponseB2P.Builder newBuilder = ClientBroker.SendMessageResponseB2P.newBuilder();
        newBuilder.setSuccess(false);
        if (!this.started.get() || ServiceStatusHolder.isWriteServiceStop()) {
            newBuilder.setErrCode(503);
            newBuilder.setErrMsg("Write StoreService temporary unavailable!");
            return newBuilder.build();
        }
        CertifiedResult identityValidUserInfo = this.serverAuthHandler.identityValidUserInfo(sendMessageRequestP2B.getAuthInfo(), true);
        if (!identityValidUserInfo.result) {
            newBuilder.setErrCode(identityValidUserInfo.errCode);
            newBuilder.setErrMsg(identityValidUserInfo.errInfo);
            return newBuilder.build();
        }
        ParamCheckResult checkClientId = PBParameterUtils.checkClientId(sendMessageRequestP2B.getClientId(), sb);
        if (!checkClientId.result) {
            newBuilder.setErrCode(checkClientId.errCode);
            newBuilder.setErrMsg(checkClientId.errMsg);
            return newBuilder.build();
        }
        ParamCheckResult checkExistTopicNameInfo = PBParameterUtils.checkExistTopicNameInfo(sendMessageRequestP2B.getTopicName(), sendMessageRequestP2B.getPartitionId(), this.metadataManager, sb);
        if (!checkExistTopicNameInfo.result) {
            newBuilder.setErrCode(checkExistTopicNameInfo.errCode);
            newBuilder.setErrMsg(checkExistTopicNameInfo.errMsg);
            return newBuilder.build();
        }
        String str2 = (String) checkExistTopicNameInfo.checkData;
        int partitionId = sendMessageRequestP2B.getPartitionId();
        String str3 = null;
        int i = -1;
        if (TStringUtils.isNotBlank(sendMessageRequestP2B.getMsgType())) {
            str3 = sendMessageRequestP2B.getMsgType().trim();
            i = str3.hashCode();
        }
        byte[] byteArray = sendMessageRequestP2B.getData().toByteArray();
        int length = byteArray.length;
        if (length <= 0) {
            newBuilder.setErrCode(400);
            newBuilder.setErrMsg("data length is zero!");
            return newBuilder.build();
        }
        if (length > 1049600) {
            newBuilder.setErrCode(400);
            newBuilder.setErrMsg(sb.append("data length over max length, allowed max length is ").append(1049600).append(", data length is ").append(length).toString());
            return newBuilder.build();
        }
        int crc32 = CheckSum.crc32(byteArray);
        if (sendMessageRequestP2B.getCheckSum() != -1 && crc32 != sendMessageRequestP2B.getCheckSum()) {
            newBuilder.setErrCode(403);
            newBuilder.setErrMsg(sb.append("Checksum msg data failure: ").append(sendMessageRequestP2B.getCheckSum()).append(" of ").append(str2).append(" not equal to the data's checksum of ").append(crc32).toString());
            return newBuilder.build();
        }
        CertifiedResult validProduceAuthorizeInfo = this.serverAuthHandler.validProduceAuthorizeInfo(identityValidUserInfo.userName, str2, str3, str);
        if (!validProduceAuthorizeInfo.result) {
            newBuilder.setErrCode(validProduceAuthorizeInfo.errCode);
            newBuilder.setErrMsg(validProduceAuthorizeInfo.errInfo);
            return newBuilder.build();
        }
        try {
            MessageStore orCreateMessageStore = this.storeManager.getOrCreateMessageStore(str2, partitionId);
            AppendResult appendResult = new AppendResult();
            if (!orCreateMessageStore.appendMsg(appendResult, length, crc32, byteArray, i, sendMessageRequestP2B.getFlag(), partitionId, sendMessageRequestP2B.getSentAddr())) {
                newBuilder.setErrCode(419);
                newBuilder.setErrMsg(sb.append("Put message failed from ").append(this.tubeConfig.getHostName()).append(", server receive message overflow!").toString());
                return newBuilder.build();
            }
            this.putCounterGroup.add(sb.append(str2).append("#").append(AddressUtils.intToIp(sendMessageRequestP2B.getSentAddr())).append("#").append(this.tubeConfig.getHostName()).append("#").append(sendMessageRequestP2B.getPartitionId()).append("#").append(sendMessageRequestP2B.getMsgTime()).toString(), 1L, length);
            newBuilder.setSuccess(true);
            newBuilder.setRequireAuth(identityValidUserInfo.reAuth);
            newBuilder.setErrCode(200);
            newBuilder.setErrMsg(String.valueOf(appendResult.getMsgId()));
            newBuilder.setMessageId(appendResult.getMsgId());
            newBuilder.setAppendTime(appendResult.getAppendTime());
            newBuilder.setAppendOffset(appendResult.getAppendIndexOffset());
            return newBuilder.build();
        } catch (Throwable th) {
            logger.error("Put message failed ", th);
            sb.delete(0, sb.length());
            newBuilder.setSuccess(false);
            newBuilder.setErrCode(500);
            newBuilder.setErrMsg(sb.append("Put message failed from ").append(this.tubeConfig.getHostName()).append(" ").append(th.getMessage() != null ? th.getMessage() : " ").toString());
            return newBuilder.build();
        }
    }

    public ClientBroker.RegisterResponseB2C consumerRegisterC2B(ClientBroker.RegisterRequestC2B registerRequestC2B, String str, boolean z) throws Throwable {
        StringBuilder sb = new StringBuilder(512);
        ClientBroker.RegisterResponseB2C.Builder newBuilder = ClientBroker.RegisterResponseB2C.newBuilder();
        newBuilder.setSuccess(false);
        newBuilder.setCurrOffset(-1L);
        CertifiedResult identityValidUserInfo = this.serverAuthHandler.identityValidUserInfo(registerRequestC2B.getAuthInfo(), false);
        if (!this.started.get()) {
            newBuilder.setErrCode(503);
            newBuilder.setErrMsg("StoreService temporary unavailable!");
            return newBuilder.build();
        }
        if (!identityValidUserInfo.result) {
            newBuilder.setErrCode(identityValidUserInfo.errCode);
            newBuilder.setErrMsg(identityValidUserInfo.errInfo);
            return newBuilder.build();
        }
        ParamCheckResult checkClientId = PBParameterUtils.checkClientId(registerRequestC2B.getClientId(), sb);
        if (!checkClientId.result) {
            newBuilder.setErrCode(checkClientId.errCode);
            newBuilder.setErrMsg(checkClientId.errMsg);
            return newBuilder.build();
        }
        String str2 = (String) checkClientId.checkData;
        ParamCheckResult checkConsumeTopicName = PBParameterUtils.checkConsumeTopicName(registerRequestC2B.getTopicName(), this.metadataManager, sb);
        if (!checkConsumeTopicName.result) {
            newBuilder.setErrCode(checkConsumeTopicName.errCode);
            newBuilder.setErrMsg(checkConsumeTopicName.errMsg);
            return newBuilder.build();
        }
        String str3 = (String) checkConsumeTopicName.checkData;
        ParamCheckResult checkGroupName = PBParameterUtils.checkGroupName(registerRequestC2B.getGroupName(), sb);
        if (!checkGroupName.result) {
            newBuilder.setErrCode(checkGroupName.errCode);
            newBuilder.setErrMsg(checkGroupName.errMsg);
            return newBuilder.build();
        }
        String str4 = (String) checkGroupName.checkData;
        boolean z2 = registerRequestC2B.getOpType() == 31;
        HashSet hashSet = new HashSet();
        if (registerRequestC2B.getFilterCondStrList() != null && !registerRequestC2B.getFilterCondStrList().isEmpty()) {
            for (String str5 : registerRequestC2B.getFilterCondStrList()) {
                if (TStringUtils.isNotBlank(str5)) {
                    hashSet.add(str5.trim());
                }
            }
        }
        CertifiedResult validConsumeAuthorizeInfo = this.serverAuthHandler.validConsumeAuthorizeInfo(identityValidUserInfo.userName, str4, str3, hashSet, z2, str);
        if (!validConsumeAuthorizeInfo.result) {
            newBuilder.setErrCode(validConsumeAuthorizeInfo.errCode);
            newBuilder.setErrMsg(validConsumeAuthorizeInfo.errInfo);
            return newBuilder.build();
        }
        String partStr = getPartStr(str4, str3, registerRequestC2B.getPartitionId());
        try {
            try {
                Integer lock = this.brokerRowLock.getLock((Integer) null, StringUtils.getBytesUtf8(str2), true);
                try {
                    Integer lock2 = this.brokerRowLock.getLock((Integer) null, StringUtils.getBytesUtf8(partStr), true);
                    if (registerRequestC2B.getOpType() == 31) {
                        ClientBroker.RegisterResponseB2C inProcessConsumerRegister = inProcessConsumerRegister(str2, str4, str3, partStr, hashSet, z, registerRequestC2B, newBuilder, sb);
                        if (lock2 != null) {
                            this.brokerRowLock.releaseRowLock(lock2);
                        }
                        if (lock != null) {
                            this.brokerRowLock.releaseRowLock(lock);
                        }
                        return inProcessConsumerRegister;
                    }
                    if (registerRequestC2B.getOpType() == 32) {
                        ClientBroker.RegisterResponseB2C inProcessConsumerUnregister = inProcessConsumerUnregister(str2, str4, str3, partStr, registerRequestC2B, z, newBuilder, sb);
                        if (lock2 != null) {
                            this.brokerRowLock.releaseRowLock(lock2);
                        }
                        if (lock != null) {
                            this.brokerRowLock.releaseRowLock(lock);
                        }
                        return inProcessConsumerUnregister;
                    }
                    String sb2 = sb.append("Invalid request:").append(registerRequestC2B.getOpType()).toString();
                    logger.info(sb2);
                    newBuilder.setErrCode(400);
                    newBuilder.setErrMsg(sb2);
                    ClientBroker.RegisterResponseB2C build = newBuilder.build();
                    if (lock2 != null) {
                        this.brokerRowLock.releaseRowLock(lock2);
                    }
                    if (lock != null) {
                        this.brokerRowLock.releaseRowLock(lock);
                    }
                    return build;
                } catch (Throwable th) {
                    if (0 != 0) {
                        this.brokerRowLock.releaseRowLock(null);
                    }
                    throw th;
                }
            } catch (Throwable th2) {
                if (0 != 0) {
                    this.brokerRowLock.releaseRowLock(null);
                }
                throw th2;
            }
        } catch (IOException e) {
            sb.delete(0, sb.length());
            logger.warn("Failed to lock.", e);
            newBuilder.setErrCode(400);
            newBuilder.setErrMsg(sb.append("Failed to lock.").append(e.getMessage()).toString());
            ClientBroker.RegisterResponseB2C build2 = newBuilder.build();
            if (0 != 0) {
                this.brokerRowLock.releaseRowLock(null);
            }
            return build2;
        }
    }

    private ClientBroker.RegisterResponseB2C inProcessConsumerRegister(String str, String str2, String str3, String str4, Set<String> set, boolean z, ClientBroker.RegisterRequestC2B registerRequestC2B, ClientBroker.RegisterResponseB2C.Builder builder, StringBuilder sb) {
        String str5 = null;
        ConsumerNodeInfo consumerNodeInfo = this.consumerRegisterMap.get(str4);
        if (consumerNodeInfo != null) {
            str5 = consumerNodeInfo.getConsumerId();
        }
        if (!TStringUtils.isEmpty(str5) && !str5.equals(str)) {
            TimeoutInfo timeoutInfo = this.heartbeatManager.getConsumerRegMap().get(getHeartbeatNodeId(str5, str4));
            if (timeoutInfo == null || System.currentTimeMillis() >= timeoutInfo.getTimeoutTime()) {
                this.consumerRegisterMap.remove(str4);
                sb.append("[Duplicated Register] Remove Invalid Consumer Register ").append(str5).append("#").append(str4);
            } else {
                sb.append("[Duplicated Register] Partition ").append(this.tubeConfig.getBrokerId()).append("#").append(str4).append(" has been registered by ").append(str5);
            }
            logger.warn(sb.toString());
            builder.setErrCode(410);
            builder.setErrMsg(sb.toString());
            return builder.build();
        }
        long currOffset = registerRequestC2B.hasCurrOffset() ? registerRequestC2B.getCurrOffset() : -1L;
        long sessionTime = registerRequestC2B.hasSessionTime() ? registerRequestC2B.getSessionTime() : -1L;
        String sessionKey = registerRequestC2B.hasSessionKey() ? registerRequestC2B.getSessionKey() : null;
        int qryPriorityId = registerRequestC2B.hasQryPriorityId() ? registerRequestC2B.getQryPriorityId() : -2;
        this.consumerRegisterMap.put(str4, new ConsumerNodeInfo(this.storeManager, qryPriorityId, str, set, sessionKey, sessionTime, true, str4));
        this.heartbeatManager.regConsumerNode(getHeartbeatNodeId(str, str4), str, str4);
        try {
            MessageStore orCreateMessageStore = this.storeManager.getOrCreateMessageStore(str3, registerRequestC2B.getPartitionId());
            if (orCreateMessageStore == null) {
                builder.setErrCode(403);
                builder.setErrMsg(sb.append("Topic ").append(str3).append("-").append(registerRequestC2B.getPartitionId()).append(" not existed, please check your configure").toString());
                return builder.build();
            }
            OffsetStorageInfo loadOffset = this.offsetManager.loadOffset(orCreateMessageStore, str2, str3, registerRequestC2B.getPartitionId(), registerRequestC2B.getReadStatus(), currOffset, sb);
            logger.info(sb.append("[Consumer Register]").append(str).append("#").append(str4).append("#").append(loadOffset).append(", reqOffset=").append(currOffset).append(", reqQryPriorityId=").append(qryPriorityId).append(", isOverTLS=").append(z).toString());
            builder.setSuccess(true);
            builder.setErrCode(200);
            builder.setErrMsg("OK!");
            builder.setCurrOffset(loadOffset.getOffset());
            if (getRealQryPriorityId(consumerNodeInfo) <= 1) {
                builder.setMaxOffset(orCreateMessageStore.getFileIndexMaxOffset());
            } else {
                builder.setMaxOffset(orCreateMessageStore.getIndexMaxOffset());
            }
            return builder.build();
        } catch (Throwable th) {
            sb.delete(0, sb.length());
            logger.warn("Register broker failure!", th);
            builder.setErrCode(500);
            builder.setErrMsg(sb.append("Register broker failure!").append(", exception is ").append(th.getMessage()).toString());
            return builder.build();
        }
    }

    private ClientBroker.RegisterResponseB2C inProcessConsumerUnregister(String str, String str2, String str3, String str4, ClientBroker.RegisterRequestC2B registerRequestC2B, boolean z, ClientBroker.RegisterResponseB2C.Builder builder, StringBuilder sb) {
        logger.info(sb.append("[Consumer Unregister]").append(str).append(", isOverTLS=").append(z).toString());
        sb.delete(0, sb.length());
        ConsumerNodeInfo consumerNodeInfo = this.consumerRegisterMap.get(str4);
        if (consumerNodeInfo == null) {
            logger.warn(sb.append("[UnRegistered Consumer2]").append(str).append("#").append(str4).toString());
            sb.delete(0, sb.length());
            builder.setErrCode(411);
            builder.setErrMsg(sb.append("UnRegistered Consumer ").append(str).append(", you have to register firstly!").toString());
            return builder.build();
        }
        if (!str.equals(consumerNodeInfo.getConsumerId())) {
            logger.warn(sb.append("[Duplicated Request]").append("Partition ").append(str4).append(" has been consumed by ").append(consumerNodeInfo.getConsumerId()).append(";Current consumer ").append(str).toString());
            builder.setErrCode(412);
            builder.setErrMsg(sb.append(", broker=").append(this.tubeConfig.getHostName()).toString());
            return builder.build();
        }
        try {
            logger.info(sb.append("[Unregister Offset] update lastOffset, ").append(str2).append(" topic:").append(str3).append(" partition:").append(registerRequestC2B.getPartitionId()).append(" updatedOffset:").append(this.offsetManager.commitOffset(str2, str3, registerRequestC2B.getPartitionId(), registerRequestC2B.getReadStatus() == 0)).toString());
            sb.delete(0, sb.length());
            this.consumerRegisterMap.remove(str4);
            this.heartbeatManager.unRegConsumerNode(getHeartbeatNodeId(str, str4));
            builder.setSuccess(true);
            builder.setErrCode(200);
            builder.setErrMsg("OK!");
            return builder.build();
        } catch (Exception e) {
            sb.delete(0, sb.length());
            logger.warn(sb.append("Unregister consumer:").append(str).append(" failed.").toString(), e);
            builder.setErrCode(500);
            builder.setErrMsg(sb.append(" exception is ").append(e.getMessage()).toString());
            return builder.build();
        }
    }

    public ClientBroker.HeartBeatResponseB2C consumerHeartbeatC2B(ClientBroker.HeartBeatRequestC2B heartBeatRequestC2B, String str, boolean z) throws Throwable {
        ClientBroker.HeartBeatResponseB2C.Builder newBuilder = ClientBroker.HeartBeatResponseB2C.newBuilder();
        StringBuilder sb = new StringBuilder(512);
        newBuilder.setSuccess(false);
        if (!this.started.get()) {
            newBuilder.setErrCode(503);
            newBuilder.setErrMsg("StoreService temporary unavailable!");
            return newBuilder.build();
        }
        CertifiedResult identityValidUserInfo = this.serverAuthHandler.identityValidUserInfo(heartBeatRequestC2B.getAuthInfo(), false);
        if (!identityValidUserInfo.result) {
            newBuilder.setErrCode(identityValidUserInfo.errCode);
            newBuilder.setErrMsg(identityValidUserInfo.errInfo);
            return newBuilder.build();
        }
        ParamCheckResult checkClientId = PBParameterUtils.checkClientId(heartBeatRequestC2B.getClientId(), sb);
        if (!checkClientId.result) {
            newBuilder.setErrCode(checkClientId.errCode);
            newBuilder.setErrMsg(checkClientId.errMsg);
            return newBuilder.build();
        }
        String str2 = (String) checkClientId.checkData;
        ParamCheckResult checkGroupName = PBParameterUtils.checkGroupName(heartBeatRequestC2B.getGroupName(), sb);
        if (!checkGroupName.result) {
            newBuilder.setErrCode(checkGroupName.errCode);
            newBuilder.setErrMsg(checkGroupName.errMsg);
            return newBuilder.build();
        }
        String str3 = (String) checkGroupName.checkData;
        int qryPriorityId = heartBeatRequestC2B.hasQryPriorityId() ? heartBeatRequestC2B.getQryPriorityId() : -2;
        List<Partition> convertPartitionInfo = DataConverterUtil.convertPartitionInfo(heartBeatRequestC2B.getPartitionInfoList());
        boolean z2 = false;
        ArrayList arrayList = new ArrayList();
        for (Partition partition : convertPartitionInfo) {
            String topic = partition.getTopic();
            int partitionId = partition.getPartitionId();
            String partStr = getPartStr(str3, topic, partitionId);
            ConsumerNodeInfo consumerNodeInfo = this.consumerRegisterMap.get(partStr);
            if (consumerNodeInfo == null) {
                arrayList.add(sb.append(411).append(":").append(partition.toString()).toString());
                sb.delete(0, sb.length());
                logger.warn(sb.append("[Heartbeat Check] UnRegistered Consumer:").append(str2).append("#").append(partStr).toString());
                sb.delete(0, sb.length());
            } else if (str2.equals(consumerNodeInfo.getConsumerId())) {
                if (!z2) {
                    CertifiedResult validConsumeAuthorizeInfo = this.serverAuthHandler.validConsumeAuthorizeInfo(identityValidUserInfo.userName, str3, topic, consumerNodeInfo.getFilterCondStrs(), true, str);
                    if (!validConsumeAuthorizeInfo.result) {
                        newBuilder.setRequireAuth(validConsumeAuthorizeInfo.reAuth);
                        newBuilder.setErrCode(validConsumeAuthorizeInfo.errCode);
                        newBuilder.setErrMsg(validConsumeAuthorizeInfo.errInfo);
                        return newBuilder.build();
                    }
                    z2 = true;
                }
                try {
                    this.heartbeatManager.updConsumerNode(getHeartbeatNodeId(str2, partStr));
                    if (consumerNodeInfo.getQryPriorityId() != qryPriorityId) {
                        consumerNodeInfo.setQryPriorityId(qryPriorityId);
                    }
                } catch (HeartbeatException e) {
                    arrayList.add(sb.append(411).append(":").append(partition.toString()).toString());
                    sb.delete(0, sb.length());
                    logger.warn(sb.append("[Heartbeat Check] Invalid Request").append(str2).append("#").append(topic).append(":").append(partitionId).toString());
                    sb.delete(0, sb.length());
                }
            } else {
                arrayList.add(sb.append(412).append(":").append(partition.toString()).toString());
                sb.delete(0, sb.length());
                sb.append("[Heartbeat Check] Duplicated partition: Partition ").append(partStr).append(" has been consumed by ").append(consumerNodeInfo.getConsumerId()).append(";Current consumer ").append(str2);
                logger.warn(sb.toString());
                sb.delete(0, sb.length());
            }
        }
        newBuilder.setRequireAuth(identityValidUserInfo.reAuth);
        newBuilder.setSuccess(true);
        newBuilder.setErrCode(200);
        newBuilder.setHasPartFailure(false);
        if (!arrayList.isEmpty()) {
            newBuilder.setHasPartFailure(true);
            newBuilder.addAllFailureInfo(arrayList);
        }
        newBuilder.setErrMsg("OK!");
        return newBuilder.build();
    }

    public ClientBroker.CommitOffsetResponseB2C consumerCommitC2B(ClientBroker.CommitOffsetRequestC2B commitOffsetRequestC2B, String str, boolean z) throws Throwable {
        ClientBroker.CommitOffsetResponseB2C.Builder newBuilder = ClientBroker.CommitOffsetResponseB2C.newBuilder();
        StringBuilder sb = new StringBuilder(512);
        newBuilder.setSuccess(false);
        newBuilder.setCurrOffset(-1L);
        if (!this.started.get()) {
            newBuilder.setErrCode(503);
            newBuilder.setErrMsg("StoreService temporary unavailable!");
            return newBuilder.build();
        }
        ParamCheckResult checkClientId = PBParameterUtils.checkClientId(commitOffsetRequestC2B.getClientId(), sb);
        if (!checkClientId.result) {
            newBuilder.setErrCode(checkClientId.errCode);
            newBuilder.setErrMsg(checkClientId.errMsg);
            return newBuilder.build();
        }
        String str2 = (String) checkClientId.checkData;
        ParamCheckResult checkGroupName = PBParameterUtils.checkGroupName(commitOffsetRequestC2B.getGroupName(), sb);
        if (!checkGroupName.result) {
            newBuilder.setErrCode(checkGroupName.errCode);
            newBuilder.setErrMsg(checkGroupName.errMsg);
            return newBuilder.build();
        }
        String str3 = (String) checkGroupName.checkData;
        int partitionId = commitOffsetRequestC2B.getPartitionId();
        ParamCheckResult checkExistTopicNameInfo = PBParameterUtils.checkExistTopicNameInfo(commitOffsetRequestC2B.getTopicName(), partitionId, this.metadataManager, sb);
        if (!checkExistTopicNameInfo.result) {
            newBuilder.setErrCode(checkExistTopicNameInfo.errCode);
            newBuilder.setErrMsg(checkExistTopicNameInfo.errMsg);
            return newBuilder.build();
        }
        String str4 = (String) checkExistTopicNameInfo.checkData;
        String partStr = getPartStr(str3, str4, partitionId);
        ConsumerNodeInfo consumerNodeInfo = this.consumerRegisterMap.get(partStr);
        if (consumerNodeInfo == null) {
            newBuilder.setErrCode(401);
            newBuilder.setErrMsg("The partition not registered by consumers");
            logger.error(sb.append("[consumerCommitC2B error] partition not registered by consumers: commit consumer is: ").append(str2).append(", partition is : ").append(partStr).toString());
            return newBuilder.build();
        }
        boolean z2 = true;
        if (commitOffsetRequestC2B.hasLastPackConsumed()) {
            z2 = commitOffsetRequestC2B.getLastPackConsumed();
        }
        if (str2.equals(consumerNodeInfo.getConsumerId())) {
            try {
                long commitOffset = this.offsetManager.commitOffset(str3, str4, partitionId, z2);
                MessageStore orCreateMessageStore = this.storeManager.getOrCreateMessageStore(str4, partitionId);
                if (orCreateMessageStore == null) {
                    newBuilder.setErrCode(403);
                    newBuilder.setErrMsg(sb.append("Topic ").append(str4).append("-").append(commitOffsetRequestC2B.getPartitionId()).append(" not existed, please check your configure").toString());
                    return newBuilder.build();
                }
                newBuilder.setSuccess(true);
                newBuilder.setErrCode(200);
                newBuilder.setErrMsg("OK!");
                newBuilder.setCurrOffset(commitOffset);
                if (getRealQryPriorityId(consumerNodeInfo) <= 1) {
                    newBuilder.setMaxOffset(orCreateMessageStore.getFileIndexMaxOffset());
                } else {
                    newBuilder.setMaxOffset(orCreateMessageStore.getIndexMaxOffset());
                }
            } catch (Exception e) {
                newBuilder.setErrMsg(e.getMessage());
                newBuilder.setErrCode(500);
                logger.error("[commitOffset error]", e);
            }
        } else {
            newBuilder.setErrCode(401);
            newBuilder.setErrMsg(sb.append("The partition has been registered by other consumer: ").append(consumerNodeInfo.getConsumerId()).toString());
            sb.delete(0, sb.length());
            logger.error(sb.append("[consumerCommitC2B error] partition has been registered by other consumer: commit consumer is: ").append(str2).append(", registered consumer is: ").append(consumerNodeInfo.getConsumerId()).append(", partition is : ").append(partStr).toString());
        }
        return newBuilder.build();
    }

    private String getPartStr(String str, String str2, int i) {
        return new StringBuilder(512).append(str).append(":").append(str2).append(":").append(i).toString();
    }

    private String getHeartbeatNodeId(String str, String str2) {
        return new StringBuilder(512).append(str).append("#").append(str2).toString();
    }

    private int getRealQryPriorityId(ConsumerNodeInfo consumerNodeInfo) {
        return consumerNodeInfo.getQryPriorityId() <= 0 ? this.metadataManager.getFlowCtrlRuleHandler().getQryPriorityId() <= 0 ? TServerConstants.CFG_DEFAULT_CONSUME_RULE : this.metadataManager.getFlowCtrlRuleHandler().getQryPriorityId() : consumerNodeInfo.getQryPriorityId();
    }
}
