package org.apache.inlong.sdk.dataproxy;

import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.apache.inlong.common.util.MessageUtils;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.apache.inlong.sdk.dataproxy.utils.Tuple2;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/DefaultMessageSender.class */
public class DefaultMessageSender implements MessageSender {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultMessageSender.class);
    private static final ConcurrentHashMap<Integer, DefaultMessageSender> CACHE_SENDER = new ConcurrentHashMap<>();
    private static final AtomicBoolean MANAGER_FETCHER_THREAD_STARTED = new AtomicBoolean(false);
    private static final SequentialID idGenerator = new SequentialID();
    private final Sender sender;
    private final IndexCollectThread indexCol;
    private final Map<String, Long> storeIndex;
    private String groupId;
    private int msgtype;
    private boolean isCompress;
    private boolean isGroupIdTransfer;
    private boolean isReport;
    private boolean isSupportLF;
    private int maxPacketLength;
    private int cpsSize;
    private final int senderMaxAttempt;

    public DefaultMessageSender(ProxyClientConfig proxyClientConfig) throws Exception {
        this(proxyClientConfig, null);
    }

    public DefaultMessageSender(ProxyClientConfig proxyClientConfig, ThreadFactory threadFactory) throws Exception {
        this.storeIndex = new ConcurrentHashMap();
        this.msgtype = 7;
        this.isCompress = true;
        this.isGroupIdTransfer = false;
        this.isReport = false;
        this.isSupportLF = false;
        this.maxPacketLength = -1;
        this.cpsSize = ConfigConstants.COMPRESS_SIZE;
        ProxyUtils.validClientConfig(proxyClientConfig);
        this.sender = new Sender(proxyClientConfig, threadFactory);
        this.sender.start();
        this.groupId = proxyClientConfig.getInlongGroupId();
        this.indexCol = new IndexCollectThread(this.storeIndex);
        this.senderMaxAttempt = proxyClientConfig.getSenderMaxAttempt();
        this.indexCol.start();
    }

    public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig proxyClientConfig) throws Exception {
        return generateSenderByClusterId(proxyClientConfig, null);
    }

    public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig proxyClientConfig, ThreadFactory threadFactory) throws Exception {
        if (!"TCP".equals(proxyClientConfig.getProtocolType())) {
            proxyClientConfig.setProtocolType("TCP");
        }
        LOGGER.info("Initial tcp sender, configure is {}", proxyClientConfig);
        Tuple2<ProxyConfigEntry, String> groupIdConfigure = new ProxyConfigManager(proxyClientConfig).getGroupIdConfigure(true);
        if (groupIdConfigure.getF0() == null) {
            throw new Exception(groupIdConfigure.getF1());
        }
        DefaultMessageSender defaultMessageSender = CACHE_SENDER.get(Integer.valueOf(groupIdConfigure.getF0().getClusterId()));
        if (defaultMessageSender != null) {
            return defaultMessageSender;
        }
        DefaultMessageSender defaultMessageSender2 = new DefaultMessageSender(proxyClientConfig, threadFactory);
        defaultMessageSender2.setMaxPacketLength(groupIdConfigure.getF0().getMaxPacketLength());
        CACHE_SENDER.put(Integer.valueOf(groupIdConfigure.getF0().getClusterId()), defaultMessageSender2);
        return defaultMessageSender2;
    }

    public static void finallyCleanup() {
        Iterator<DefaultMessageSender> it = CACHE_SENDER.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        CACHE_SENDER.clear();
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void close() {
        LOGGER.info("ready to close resources, may need five minutes !");
        if (this.sender.getClusterId() != -1) {
            CACHE_SENDER.remove(Integer.valueOf(this.sender.getClusterId()));
        }
        this.sender.close();
        shutdownInternalThreads();
    }

    public ProxyClientConfig getProxyClientConfig() {
        return this.sender.getConfigure();
    }

    public boolean isSupportLF() {
        return this.isSupportLF;
    }

    public void setSupportLF(boolean z) {
        this.isSupportLF = z;
    }

    public boolean isGroupIdTransfer() {
        return this.isGroupIdTransfer;
    }

    public void setGroupIdTransfer(boolean z) {
        this.isGroupIdTransfer = z;
    }

    public boolean isReport() {
        return this.isReport;
    }

    public void setReport(boolean z) {
        this.isReport = z;
    }

    public int getCpsSize() {
        return this.cpsSize;
    }

    public void setCpsSize(int i) {
        this.cpsSize = i;
    }

    public int getMsgtype() {
        return this.msgtype;
    }

    public void setMsgtype(int i) {
        this.msgtype = i;
    }

    public boolean isCompress() {
        return this.isCompress;
    }

    public void setCompress(boolean z) {
        this.isCompress = z;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }

    public int getMaxPacketLength() {
        return this.maxPacketLength;
    }

    public void setMaxPacketLength(int i) {
        this.maxPacketLength = i;
    }

    public String getSDKVersion() {
        return ConfigConstants.PROXY_SDK_VERSION;
    }

    private SendResult attemptSendMessage(Function<Sender, SendResult> function) {
        SendResult sendResult = null;
        for (int i = 0; i < this.senderMaxAttempt; i++) {
            sendResult = function.apply(this.sender);
            if (sendResult != null && sendResult.equals(SendResult.OK)) {
                return sendResult;
            }
        }
        return sendResult;
    }

    private String attemptSendMessageIndex(Function<Sender, String> function) {
        String str = null;
        for (int i = 0; i < this.senderMaxAttempt; i++) {
            str = function.apply(this.sender);
            if (str != null && str.startsWith(SendResult.OK.toString())) {
                return str;
            }
        }
        return str;
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3) {
        return sendMessage(bArr, str, str2, j, str3, false);
    }

    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3, boolean z) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        if (!ProxyUtils.isBodyLengthValid(bArr, this.maxPacketLength)) {
            return SendResult.BODY_EXCEED_MAX_LEN;
        }
        addIndexCnt(str, str2, 1L);
        String str4 = z ? "proxySend=true" : "";
        boolean z2 = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(Collections.singletonList(bArr), this.msgtype, z2, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, str4);
            encodeObject.setSupportLF(this.isSupportLF);
            return attemptSendMessage(sender -> {
                return sender.syncSendMessage(encodeObject, str3);
            });
        }
        if (this.msgtype != 3 && this.msgtype != 5) {
            return null;
        }
        if (z) {
            str4 = "&" + str4;
        }
        String str5 = str4;
        return attemptSendMessage(z2 ? sender2 -> {
            return sender2.syncSendMessage(new EncodeObject(Collections.singletonList(bArr), "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy" + str5, idGenerator.getNextId(), getMsgtype(), true, str), str3);
        } : sender3 -> {
            return sender3.syncSendMessage(new EncodeObject(Collections.singletonList(bArr), "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + str5, idGenerator.getNextId(), getMsgtype(), false, str), str3);
        });
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3, Map<String, String> map) {
        return sendMessage(bArr, str, str2, j, str3, map, false);
    }

    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3, Map<String, String> map, boolean z) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        if (!ProxyUtils.isBodyLengthValid(bArr, this.maxPacketLength)) {
            return SendResult.BODY_EXCEED_MAX_LEN;
        }
        addIndexCnt(str, str2, 1L);
        if (z) {
            map.put("proxySend", "true");
        }
        StringBuilder convertAttrToStr = MessageUtils.convertAttrToStr(map);
        boolean z2 = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(Collections.singletonList(bArr), this.msgtype, z2, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            return attemptSendMessage(sender -> {
                return sender.syncSendMessage(encodeObject, str3);
            });
        }
        if (this.msgtype != 3 && this.msgtype != 5) {
            return null;
        }
        convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt);
        if (!z2) {
            return attemptSendMessage(sender2 -> {
                return sender2.syncSendMessage(new EncodeObject(Collections.singletonList(bArr), convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), false, str), str3);
            });
        }
        convertAttrToStr.append("&cp=snappy");
        return attemptSendMessage(sender3 -> {
            return sender3.syncSendMessage(new EncodeObject(Collections.singletonList(bArr), convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), true, str), str3);
        });
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3) {
        return sendMessage(list, str, str2, j, str3, false);
    }

    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3, boolean z) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        if (!ProxyUtils.isBodyLengthValid(list, this.maxPacketLength)) {
            return SendResult.BODY_EXCEED_MAX_LEN;
        }
        addIndexCnt(str, str2, list.size());
        String str4 = z ? "syncSend=true" : "";
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, this.msgtype, this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, str4);
            encodeObject.setSupportLF(this.isSupportLF);
            return attemptSendMessage(sender -> {
                return sender.syncSendMessage(encodeObject, str3);
            });
        }
        if (this.msgtype != 3 && this.msgtype != 5) {
            return null;
        }
        if (z) {
            str4 = "&" + str4;
        }
        String str5 = str4;
        return attemptSendMessage(this.isCompress ? sender2 -> {
            return sender2.syncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy&cnt=" + list.size() + str5, idGenerator.getNextId(), getMsgtype(), true, str), str3);
        } : sender3 -> {
            return sender3.syncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cnt=" + list.size() + str5, idGenerator.getNextId(), getMsgtype(), false, str), str3);
        });
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3, Map<String, String> map) {
        return sendMessage(list, str, str2, j, str3, map, false);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3, Map<String, String> map) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, bArr, str, str2, j, str3, map, false);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, bArr, str, str2, j, str3, false);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, list, str, str2, j, str3, false);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, Map<String, String> map) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, list, str, str2, j, str3, map, false);
    }

    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3, Map<String, String> map, boolean z) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        if (!ProxyUtils.isBodyLengthValid(list, this.maxPacketLength)) {
            return SendResult.BODY_EXCEED_MAX_LEN;
        }
        addIndexCnt(str, str2, list.size());
        if (z) {
            map.put("proxySend", "true");
        }
        StringBuilder convertAttrToStr = MessageUtils.convertAttrToStr(map);
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, this.msgtype, this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            return attemptSendMessage(sender -> {
                return sender.syncSendMessage(encodeObject, str3);
            });
        }
        if (this.msgtype != 3 && this.msgtype != 5) {
            return null;
        }
        convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt).append("&cnt=").append(list.size());
        if (!this.isCompress) {
            return attemptSendMessage(sender2 -> {
                return sender2.syncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), false, str), str3);
            });
        }
        convertAttrToStr.append("&cp=snappy");
        return attemptSendMessage(sender3 -> {
            return sender3.syncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), true, str), str3);
        });
    }

    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3, boolean z) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        if (!ProxyUtils.isBodyLengthValid(bArr, this.maxPacketLength)) {
            throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
        }
        addIndexCnt(str, str2, 1L);
        String str4 = z ? "proxySend=true" : "";
        boolean z2 = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(Collections.singletonList(bArr), getMsgtype(), z2, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, str4);
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            if (!z2) {
                this.sender.asyncSendMessage(new EncodeObject(Collections.singletonList(bArr), "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + str4, idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3);
                return;
            }
            if (z) {
                str4 = "&" + str4;
            }
            this.sender.asyncSendMessage(new EncodeObject(Collections.singletonList(bArr), "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy" + str4, idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3);
        }
    }

    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3, Map<String, String> map, boolean z) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        if (!ProxyUtils.isBodyLengthValid(bArr, this.maxPacketLength)) {
            throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
        }
        addIndexCnt(str, str2, 1L);
        if (z) {
            map.put("proxySend", "true");
        }
        StringBuilder convertAttrToStr = MessageUtils.convertAttrToStr(map);
        boolean z2 = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(Collections.singletonList(bArr), getMsgtype(), z2, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt);
            if (!z2) {
                this.sender.asyncSendMessage(new EncodeObject(Collections.singletonList(bArr), convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3);
            } else {
                convertAttrToStr.append("&cp=snappy");
                this.sender.asyncSendMessage(new EncodeObject(Collections.singletonList(bArr), convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3);
            }
        }
    }

    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, boolean z) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        if (!ProxyUtils.isBodyLengthValid(list, this.maxPacketLength)) {
            throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
        }
        addIndexCnt(str, str2, list.size());
        String str4 = z ? "proxySend=true" : "";
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, getMsgtype(), this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, str4);
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            if (z) {
                str4 = "&" + str4;
            }
            if (this.isCompress) {
                this.sender.asyncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy&cnt=" + list.size() + str4, idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3);
            } else {
                this.sender.asyncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cnt=" + list.size() + str4, idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3);
            }
        }
    }

    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, Map<String, String> map, boolean z) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        if (!ProxyUtils.isBodyLengthValid(list, this.maxPacketLength)) {
            throw new ProxysdkException(SendResult.BODY_EXCEED_MAX_LEN.toString());
        }
        addIndexCnt(str, str2, list.size());
        if (z) {
            map.put("proxySend", "true");
        }
        StringBuilder convertAttrToStr = MessageUtils.convertAttrToStr(map);
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, getMsgtype(), this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt).append("&cnt=").append(list.size());
            if (!this.isCompress) {
                this.sender.asyncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3);
            } else {
                convertAttrToStr.append("&cp=snappy");
                this.sender.asyncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3);
            }
        }
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(String str, String str2, byte[] bArr, SendMessageCallback sendMessageCallback) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, bArr, str, str2, System.currentTimeMillis(), idGenerator.getNextId());
    }

    public void asyncSendMessage(String str, String str2, byte[] bArr, SendMessageCallback sendMessageCallback, boolean z) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, bArr, str, str2, System.currentTimeMillis(), idGenerator.getNextId(), z);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(String str, String str2, List<byte[]> list, SendMessageCallback sendMessageCallback) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, list, str, str2, System.currentTimeMillis(), idGenerator.getNextId());
    }

    public void asyncSendMessage(String str, String str2, List<byte[]> list, SendMessageCallback sendMessageCallback, boolean z) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, list, str, str2, System.currentTimeMillis(), idGenerator.getNextId(), z);
    }

    private void addIndexCnt(String str, String str2, long j) {
        try {
            String str3 = str + "|" + str2;
            if (this.storeIndex.containsKey(str3)) {
                this.storeIndex.put(str3, Long.valueOf(this.storeIndex.get(str3).longValue() + j));
            } else {
                this.storeIndex.put(str3, Long.valueOf(j));
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
    }

    private void shutdownInternalThreads() {
        this.indexCol.shutDown();
        MANAGER_FETCHER_THREAD_STARTED.set(false);
    }
}
