package org.apache.inlong.sdk.dataproxy;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.inlong.sdk.dataproxy.codec.EncodeObject;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigEntry;
import org.apache.inlong.sdk.dataproxy.config.ProxyConfigManager;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.network.Sender;
import org.apache.inlong.sdk.dataproxy.network.SequentialID;
import org.apache.inlong.sdk.dataproxy.network.Utils;
import org.apache.inlong.sdk.dataproxy.threads.IndexCollectThread;
import org.apache.inlong.sdk.dataproxy.threads.ManagerFetcherThread;
import org.apache.inlong.sdk.dataproxy.utils.ProxyUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/DefaultMessageSender.class */
public class DefaultMessageSender implements MessageSender {
    private static final long DEFAULT_SEND_TIMEOUT = 100;
    private final Sender sender;
    private final SequentialID idGenerator;
    private String groupId;
    private int msgtype;
    private boolean isCompress;
    private boolean isGroupIdTransfer;
    private boolean isReport;
    private boolean isSupportLF;
    private int cpsSize;
    private final IndexCollectThread indexCol;
    private final Map<String, Long> storeIndex;
    private static ManagerFetcherThread managerFetcherThread;
    private static final Logger logger = LoggerFactory.getLogger(DefaultMessageSender.class);
    private static final TimeUnit DEFAULT_SEND_TIMEUNIT = TimeUnit.MILLISECONDS;
    private static final ConcurrentHashMap<String, DefaultMessageSender> cacheSender = new ConcurrentHashMap<>();
    private static final AtomicBoolean ManagerFetcherThreadStarted = new AtomicBoolean(false);

    public boolean isSupportLF() {
        return this.isSupportLF;
    }

    public void setSupportLF(boolean z) {
        this.isSupportLF = z;
    }

    public boolean isGroupIdTransfer() {
        return this.isGroupIdTransfer;
    }

    public void setGroupIdTransfer(boolean z) {
        this.isGroupIdTransfer = z;
    }

    public boolean isReport() {
        return this.isReport;
    }

    public void setReport(boolean z) {
        this.isReport = z;
    }

    public int getCpsSize() {
        return this.cpsSize;
    }

    public void setCpsSize(int i) {
        this.cpsSize = i;
    }

    public int getMsgtype() {
        return this.msgtype;
    }

    public void setMsgtype(int i) {
        this.msgtype = i;
    }

    public boolean isCompress() {
        return this.isCompress;
    }

    public void setCompress(boolean z) {
        this.isCompress = z;
    }

    public String getGroupId() {
        return this.groupId;
    }

    public void setGroupId(String str) {
        this.groupId = str;
    }

    public DefaultMessageSender(ProxyClientConfig proxyClientConfig) throws Exception {
        this(proxyClientConfig, null);
    }

    public DefaultMessageSender(ProxyClientConfig proxyClientConfig, ThreadFactory threadFactory) throws Exception {
        this.msgtype = 7;
        this.isCompress = true;
        this.isGroupIdTransfer = false;
        this.isReport = false;
        this.isSupportLF = false;
        this.cpsSize = ConfigConstants.COMPRESS_SIZE;
        this.storeIndex = new ConcurrentHashMap();
        ProxyUtils.validClientConfig(proxyClientConfig);
        this.sender = new Sender(proxyClientConfig, threadFactory);
        this.idGenerator = new SequentialID(Utils.getLocalIp());
        this.groupId = proxyClientConfig.getGroupId();
        this.indexCol = new IndexCollectThread(this.storeIndex);
        this.indexCol.start();
        if (proxyClientConfig.isEnableSaveManagerVIps() && proxyClientConfig.isLocalVisit() && ManagerFetcherThreadStarted.compareAndSet(false, true)) {
            managerFetcherThread = new ManagerFetcherThread(proxyClientConfig);
            managerFetcherThread.start();
        }
    }

    public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig proxyClientConfig) throws Exception {
        return generateSenderByClusterId(proxyClientConfig, null);
    }

    public static DefaultMessageSender generateSenderByClusterId(ProxyClientConfig proxyClientConfig, ThreadFactory threadFactory) throws Exception {
        ProxyConfigManager proxyConfigManager = new ProxyConfigManager(proxyClientConfig, Utils.getLocalIp(), null);
        proxyConfigManager.setGroupId(proxyClientConfig.getGroupId());
        ProxyConfigEntry groupIdConfigure = proxyConfigManager.getGroupIdConfigure();
        DefaultMessageSender defaultMessageSender = cacheSender.get(groupIdConfigure.getClusterId());
        if (defaultMessageSender != null) {
            return defaultMessageSender;
        }
        DefaultMessageSender defaultMessageSender2 = new DefaultMessageSender(proxyClientConfig, threadFactory);
        cacheSender.put(groupIdConfigure.getClusterId(), defaultMessageSender2);
        return defaultMessageSender2;
    }

    public static void finallyCleanup() {
        Iterator<DefaultMessageSender> it = cacheSender.values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        cacheSender.clear();
    }

    public String getSDKVersion() {
        return ConfigConstants.PROXY_SDK_VERSION;
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    @Deprecated
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, TimeUnit timeUnit) {
        return this.sender.syncSendMessage(new EncodeObject(bArr, str, this.idGenerator.getNextId()), str2, j, timeUnit);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        addIndexCnt(str, str2, 1L);
        boolean z = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(bArr, this.msgtype, z, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, "");
            encodeObject.setSupportLF(this.isSupportLF);
            return this.sender.syncSendMessage(encodeObject, str3, j2, timeUnit);
        }
        if (this.msgtype == 3 || this.msgtype == 5) {
            return z ? this.sender.syncSendMessage(new EncodeObject(bArr, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy", this.idGenerator.getNextId(), getMsgtype(), true, str), str3, j2, timeUnit) : this.sender.syncSendMessage(new EncodeObject(bArr, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt, this.idGenerator.getNextId(), getMsgtype(), false, str), str3, j2, timeUnit);
        }
        return null;
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        addIndexCnt(str, str2, 1L);
        StringBuilder convertAttrToStr = ProxyUtils.convertAttrToStr(map);
        boolean z = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(bArr, this.msgtype, z, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            return this.sender.syncSendMessage(encodeObject, str3, j2, timeUnit);
        }
        if (this.msgtype != 3 && this.msgtype != 5) {
            return null;
        }
        convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt);
        if (!z) {
            return this.sender.syncSendMessage(new EncodeObject(bArr, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), false, str), str3, j2, timeUnit);
        }
        convertAttrToStr.append("&cp=snappy");
        return this.sender.syncSendMessage(new EncodeObject(bArr, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), true, str), str3, j2, timeUnit);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        addIndexCnt(str, str2, list.size());
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, this.msgtype, this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, "");
            encodeObject.setSupportLF(this.isSupportLF);
            return this.sender.syncSendMessage(encodeObject, str3, j2, timeUnit);
        }
        if (this.msgtype == 3 || this.msgtype == 5) {
            return this.isCompress ? this.sender.syncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy&cnt=" + list.size(), this.idGenerator.getNextId(), getMsgtype(), true, str), str3, j2, timeUnit) : this.sender.syncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cnt=" + list.size(), this.idGenerator.getNextId(), getMsgtype(), false, str), str3, j2, timeUnit);
        }
        return null;
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            return SendResult.INVALID_ATTRIBUTES;
        }
        addIndexCnt(str, str2, list.size());
        StringBuilder convertAttrToStr = ProxyUtils.convertAttrToStr(map);
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, this.msgtype, this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            return this.sender.syncSendMessage(encodeObject, str3, j2, timeUnit);
        }
        if (this.msgtype != 3 && this.msgtype != 5) {
            return null;
        }
        convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt).append("&cnt=").append(list.size());
        if (!this.isCompress) {
            return this.sender.syncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), false, str), str3, j2, timeUnit);
        }
        convertAttrToStr.append("&cp=snappy");
        return this.sender.syncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), true, str), str3, j2, timeUnit);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    @Deprecated
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, TimeUnit timeUnit) throws ProxysdkException {
        this.sender.asyncSendMessage(new EncodeObject(bArr, str, this.idGenerator.getNextId()), sendMessageCallback, str2, j, timeUnit);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        addIndexCnt(str, str2, 1L);
        boolean z = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(bArr, getMsgtype(), z, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, "");
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3, j2, timeUnit);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            if (z) {
                this.sender.asyncSendMessage(new EncodeObject(bArr, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy", this.idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3, j2, timeUnit);
            } else {
                this.sender.asyncSendMessage(new EncodeObject(bArr, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt, this.idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3, j2, timeUnit);
            }
        }
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        addIndexCnt(str, str2, 1L);
        StringBuilder convertAttrToStr = ProxyUtils.convertAttrToStr(map);
        boolean z = this.isCompress && bArr.length > this.cpsSize;
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(bArr, getMsgtype(), z, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3, j2, timeUnit);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt);
            if (!z) {
                this.sender.asyncSendMessage(new EncodeObject(bArr, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3, j2, timeUnit);
            } else {
                convertAttrToStr.append("&cp=snappy");
                this.sender.asyncSendMessage(new EncodeObject(bArr, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3, j2, timeUnit);
            }
        }
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        addIndexCnt(str, str2, list.size());
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, getMsgtype(), this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, "");
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3, j2, timeUnit);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            if (this.isCompress) {
                this.sender.asyncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cp=snappy&cnt=" + list.size(), this.idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3, j2, timeUnit);
            } else {
                this.sender.asyncSendMessage(new EncodeObject(list, "groupId=" + str + "&streamId=" + str2 + "&dt=" + covertZeroDt + "&cnt=" + list.size(), this.idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3, j2, timeUnit);
            }
        }
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        addIndexCnt(str, str2, list.size());
        StringBuilder convertAttrToStr = ProxyUtils.convertAttrToStr(map);
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, getMsgtype(), this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, this.idGenerator.getNextInt(), str, str2, convertAttrToStr.toString());
            encodeObject.setSupportLF(this.isSupportLF);
            this.sender.asyncSendMessage(encodeObject, sendMessageCallback, str3, j2, timeUnit);
        } else if (this.msgtype == 3 || this.msgtype == 5) {
            convertAttrToStr.append("&groupId=").append(str).append("&streamId=").append(str2).append("&dt=").append(covertZeroDt).append("&cnt=").append(list.size());
            if (!this.isCompress) {
                this.sender.asyncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), false, str), sendMessageCallback, str3, j2, timeUnit);
            } else {
                convertAttrToStr.append("&cp=snappy");
                this.sender.asyncSendMessage(new EncodeObject(list, convertAttrToStr.toString(), this.idGenerator.getNextId(), getMsgtype(), true, str), sendMessageCallback, str3, j2, timeUnit);
            }
        }
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(String str, String str2, byte[] bArr, SendMessageCallback sendMessageCallback) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, bArr, str, str2, System.currentTimeMillis(), this.idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(String str, String str2, List<byte[]> list, SendMessageCallback sendMessageCallback) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, list, str, str2, System.currentTimeMillis(), this.idGenerator.getNextId(), DEFAULT_SEND_TIMEOUT, DEFAULT_SEND_TIMEUNIT);
    }

    private void addIndexCnt(String str, String str2, long j) {
        try {
            String str3 = str + "|" + str2;
            if (this.storeIndex.containsKey(str3)) {
                this.storeIndex.put(str3, Long.valueOf(this.storeIndex.get(str3).longValue() + j));
            } else {
                this.storeIndex.put(str3, Long.valueOf(j));
            }
        } catch (Exception e) {
            logger.error(e.getMessage());
        }
    }

    public void asyncsendMessageData(FileCallback fileCallback, List<byte[]> list, String str, String str2, long j, int i, boolean z, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        addIndexCnt(str, str2, list.size());
        StringBuilder convertAttrToStr = ProxyUtils.convertAttrToStr(map);
        if (this.msgtype == 7 || this.msgtype == 8) {
            EncodeObject encodeObject = new EncodeObject(list, this.msgtype, this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, i, str, str2, convertAttrToStr.toString(), "data", "");
            encodeObject.setSupportLF(z);
            this.sender.asyncSendMessageIndex(encodeObject, fileCallback, str3, j2, timeUnit);
        }
    }

    private void asyncSendMetric(FileCallback fileCallback, byte[] bArr, String str, String str2, long j, int i, String str3, String str4, long j2, TimeUnit timeUnit, String str5) throws ProxysdkException {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt)) {
            throw new ProxysdkException(SendResult.INVALID_ATTRIBUTES.toString());
        }
        if (this.msgtype == 7 || this.msgtype == 8) {
            this.sender.asyncSendMessageIndex(new EncodeObject(bArr, this.msgtype, false, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, i, str, str2, "", str5, str3), fileCallback, str4, j2, timeUnit);
        }
    }

    public void asyncsendMessageProxy(FileCallback fileCallback, byte[] bArr, String str, String str2, long j, int i, String str3, String str4, long j2, TimeUnit timeUnit) throws ProxysdkException {
        asyncSendMetric(fileCallback, bArr, str, str2, j, i, str3, str4, j2, timeUnit, "minute");
    }

    public void asyncsendMessageFile(FileCallback fileCallback, byte[] bArr, String str, String str2, long j, int i, String str3, long j2, TimeUnit timeUnit) throws ProxysdkException {
        asyncSendMetric(fileCallback, bArr, str, str2, j, i, "", str3, j2, timeUnit, "file");
    }

    public String sendMessageData(List<byte[]> list, String str, String str2, long j, int i, boolean z, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(list) || !ProxyUtils.isDtValid(covertZeroDt) || !ProxyUtils.isAttrKeysValid(map)) {
            return SendResult.INVALID_ATTRIBUTES.toString();
        }
        addIndexCnt(str, str2, list.size());
        StringBuilder convertAttrToStr = ProxyUtils.convertAttrToStr(map);
        if (this.msgtype != 7 && this.msgtype != 8) {
            return null;
        }
        EncodeObject encodeObject = new EncodeObject(list, this.msgtype, this.isCompress, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, i, str, str2, convertAttrToStr.toString(), "data", "");
        encodeObject.setSupportLF(z);
        return this.sender.syncSendMessageIndex(encodeObject, str3, j2, timeUnit);
    }

    private String sendMetric(byte[] bArr, String str, String str2, long j, int i, String str3, String str4, long j2, TimeUnit timeUnit, String str5) {
        long covertZeroDt = ProxyUtils.covertZeroDt(j);
        if (!ProxyUtils.isBodyValid(bArr) || !ProxyUtils.isDtValid(covertZeroDt)) {
            return SendResult.INVALID_ATTRIBUTES.toString();
        }
        if (this.msgtype != 7 && this.msgtype != 8) {
            return null;
        }
        return this.sender.syncSendMessageIndex(new EncodeObject(bArr, this.msgtype, false, this.isReport, this.isGroupIdTransfer, covertZeroDt / 1000, i, str, str2, "", str5, str3), str4, j2, timeUnit);
    }

    public String sendMessageProxy(byte[] bArr, String str, String str2, long j, int i, String str3, String str4, long j2, TimeUnit timeUnit) {
        return sendMetric(bArr, str, str2, j, i, str3, str4, j2, timeUnit, "minute");
    }

    public String sendMessageFile(byte[] bArr, String str, String str2, long j, int i, String str3, long j2, TimeUnit timeUnit) {
        return sendMetric(bArr, str, str2, j, i, "", str3, j2, timeUnit, "file");
    }

    private void shutdownInternalThreads() {
        this.indexCol.shutDown();
        managerFetcherThread.shutdown();
        ManagerFetcherThreadStarted.set(false);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void close() {
        logger.info("ready to close resources, may need five minutes !");
        if (this.sender.getClusterId() != null) {
            cacheSender.remove(this.sender.getClusterId());
        }
        this.sender.close();
        shutdownInternalThreads();
    }
}
