package org.apache.inlong.agent.plugin.sinks.filecollect;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.message.filecollect.SenderMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager.class */
public class SenderManager {
    private DefaultMessageSender sender;
    private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
    private ThreadFactory SHARED_FACTORY;
    private final String managerHost;
    private final int managerPort;
    private final String netTag;
    private final String localhost;
    private final boolean isLocalVisit;
    private final int totalAsyncBufSize;
    private final int aliveConnectionNum;
    private final boolean isCompress;
    private final int msgType;
    private final boolean isFile;
    private final long maxSenderTimeout;
    private final int maxSenderRetry;
    private final long retrySleepTime;
    private final String inlongGroupId;
    private final int maxSenderPerGroup;
    private final String sourcePath;
    private final boolean proxySend;
    private AgentMetricItemSet metricItemSet;
    private Map<String, String> dimensions;
    private int ioThreadNum;
    private boolean enableBusyWait;
    private String authSecretId;
    private String authSecretKey;
    protected int batchFlushInterval;
    protected InstanceProfile profile;
    private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
    private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
    private static final ThreadPoolExecutor EXECUTOR_SERVICE = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("sender-manager"));
    private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
    private volatile boolean shutdown = false;
    private volatile boolean resendRunning = false;
    private volatile boolean started = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/filecollect/SenderManager$AgentSenderCallback.class */
    public class AgentSenderCallback implements SendMessageCallback {
        private final int retry;
        private final SenderMessage message;
        private final int msgCnt;

        AgentSenderCallback(SenderMessage senderMessage, int i) {
            this.message = senderMessage;
            this.retry = i;
            this.msgCnt = senderMessage.getDataList().size();
        }

        public void onMessageAck(SendResult sendResult) {
            String groupId = this.message.getGroupId();
            String streamId = this.message.getStreamId();
            String taskId = this.message.getTaskId();
            String instanceId = this.message.getInstanceId();
            long dataTime = this.message.getDataTime();
            if (sendResult != null && sendResult.equals(SendResult.OK)) {
                this.message.getOffsetAckList().forEach(offsetAckInfo -> {
                    offsetAckInfo.setHasAck(true);
                });
                SenderManager.this.getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(this.msgCnt);
                AuditUtils.add(4, groupId, streamId, dataTime, this.message.getMsgCnt(), this.message.getTotalSize());
                AuditUtils.add(30002, groupId, streamId, AgentUtils.getCurrentTime(), this.message.getMsgCnt(), this.message.getTotalSize());
                return;
            }
            SenderManager.LOGGER.warn("send groupId {}, streamId {}, taskId {}, instanceId {}, dataTime {} fail with times {}, error {}", new Object[]{groupId, streamId, taskId, instanceId, Long.valueOf(dataTime), Integer.valueOf(this.retry), sendResult});
            SenderManager.this.getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(this.msgCnt);
            SenderManager.this.putInResendQueue(new AgentSenderCallback(this.message, this.retry));
            AuditUtils.add(10004, groupId, streamId, dataTime, this.message.getMsgCnt(), this.message.getTotalSize());
            AuditUtils.add(30011, groupId, streamId, AgentUtils.getCurrentTime(), this.message.getMsgCnt(), this.message.getTotalSize());
        }

        public void onException(Throwable th) {
            SenderManager.this.getMetricItem(this.message.getGroupId(), this.message.getStreamId()).pluginSendFailCount.addAndGet(this.msgCnt);
            SenderManager.LOGGER.error("exception caught", th);
        }
    }

    public SenderManager(InstanceProfile instanceProfile, String str, String str2) {
        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
        this.profile = instanceProfile;
        this.managerHost = agentConf.get("agent.manager.vip.http.host");
        this.managerPort = agentConf.getInt("agent.manager.vip.http.port");
        this.proxySend = instanceProfile.getBoolean("job.proxySend", false);
        this.localhost = instanceProfile.get("proxy.localHost", CommonConstants.DEFAULT_PROXY_LOCALHOST);
        this.netTag = instanceProfile.get("proxy.net.tag", "");
        this.isLocalVisit = instanceProfile.getBoolean("proxy.isLocalVisit", true);
        this.totalAsyncBufSize = instanceProfile.getInt("proxy.total.async.proxy.size", 209715200);
        this.aliveConnectionNum = instanceProfile.getInt("proxy.alive.connection.num", 10);
        this.isCompress = instanceProfile.getBoolean("proxy.is.compress", true);
        this.maxSenderPerGroup = instanceProfile.getInt("proxy.max.sender.per.group", 10);
        this.msgType = instanceProfile.getInt("proxy.msgType", 7);
        this.maxSenderTimeout = instanceProfile.getInt("proxy.sender.maxTimeout", 20);
        this.maxSenderRetry = instanceProfile.getInt("proxy.sender.maxRetry", 5);
        this.retrySleepTime = instanceProfile.getLong("proxy.retry.sleep", 500L);
        this.isFile = instanceProfile.getBoolean("proxy.isFile", false);
        this.ioThreadNum = instanceProfile.getInt("client.iothread.num", CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
        this.enableBusyWait = instanceProfile.getBoolean("client.enable.busy.wait", false);
        this.batchFlushInterval = instanceProfile.getInt("proxy.batch.flush.interval", 100);
        this.authSecretId = agentConf.get("agent.manager.auth.secretId");
        this.authSecretKey = agentConf.get("agent.manager.auth.secretKey");
        this.sourcePath = str2;
        this.inlongGroupId = str;
        this.dimensions = new HashMap();
        this.dimensions.put("pluginId", getClass().getSimpleName());
        this.metricItemSet = new AgentMetricItemSet(String.join("-", getClass().getSimpleName(), String.valueOf(METRIC_INDEX.incrementAndGet())));
        MetricRegister.register(this.metricItemSet);
        this.resendQueue = new LinkedBlockingQueue<>();
    }

    public void Start() throws Exception {
        createMessageSender(this.inlongGroupId);
        EXECUTOR_SERVICE.execute(flushResendQueue());
        this.started = true;
    }

    public void Stop() {
        LOGGER.info("stop send manager");
        this.shutdown = true;
        if (this.started) {
            while (this.resendRunning) {
                AgentUtils.silenceSleepInMs(1L);
            }
            closeMessageSender();
            LOGGER.info("stop send manager end");
        }
    }

    private void closeMessageSender() {
        if (this.sender != null) {
            this.sender.close();
        }
    }

    private AgentMetricItem getMetricItem(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("pluginId", getClass().getSimpleName());
        hashMap.putAll(map);
        return this.metricItemSet.findMetricItem(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AgentMetricItem getMetricItem(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("inlongGroupId", str);
        hashMap.put("inlongStreamId", str2);
        return getMetricItem(hashMap);
    }

    private void createMessageSender(String str) throws Exception {
        ProxyClientConfig proxyClientConfig = new ProxyClientConfig(this.localhost, this.isLocalVisit, this.managerHost, this.managerPort, str, this.authSecretId, this.authSecretKey);
        proxyClientConfig.setTotalAsyncCallbackSize(this.totalAsyncBufSize);
        proxyClientConfig.setFile(this.isFile);
        proxyClientConfig.setAliveConnections(this.aliveConnectionNum);
        proxyClientConfig.setIoThreadNum(this.ioThreadNum);
        proxyClientConfig.setEnableBusyWait(this.enableBusyWait);
        proxyClientConfig.setProtocolType("TCP");
        this.SHARED_FACTORY = new DefaultThreadFactory("agent-sender-manager-" + this.sourcePath, Thread.currentThread().isDaemon());
        DefaultMessageSender defaultMessageSender = new DefaultMessageSender(proxyClientConfig, this.SHARED_FACTORY);
        defaultMessageSender.setMsgtype(this.msgType);
        defaultMessageSender.setCompress(this.isCompress);
        this.sender = defaultMessageSender;
    }

    public void sendBatch(SenderMessage senderMessage) {
        while (!this.shutdown && !this.resendQueue.isEmpty()) {
            AgentUtils.silenceSleepInMs(this.retrySleepTime);
        }
        if (this.shutdown) {
            return;
        }
        sendBatchWithRetryCount(senderMessage, 0);
    }

    private void sendBatchWithRetryCount(SenderMessage senderMessage, int i) {
        boolean z = false;
        while (!z) {
            try {
                asyncSendByMessageSender(new AgentSenderCallback(senderMessage, i), senderMessage.getDataList(), senderMessage.getGroupId(), senderMessage.getStreamId(), senderMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), this.maxSenderTimeout, TimeUnit.SECONDS, senderMessage.getExtraMap(), this.proxySend);
                getMetricItem(senderMessage.getGroupId(), senderMessage.getStreamId()).pluginSendCount.addAndGet(senderMessage.getMsgCnt());
                z = true;
            } catch (Exception e) {
                z = false;
                if (i <= this.maxSenderRetry) {
                    LOGGER.error("Exception caught", e);
                } else if (i % 10 == 0) {
                    LOGGER.error("max retry reached, sample log Exception caught", e);
                }
                i++;
                AgentUtils.silenceSleepInMs(this.retrySleepTime);
            }
        }
    }

    private void asyncSendByMessageSender(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map, boolean z) throws ProxysdkException {
        this.sender.asyncSendMessage(sendMessageCallback, list, str, str2, j, str3, j2, timeUnit, map, z);
    }

    private Runnable flushResendQueue() {
        return () -> {
            AgentThreadFactory.nameThread("flushResendQueue-" + this.profile.getTaskId() + "-" + this.profile.getInstanceId());
            LOGGER.info("start flush resend queue {}:{}", this.inlongGroupId, this.sourcePath);
            this.resendRunning = true;
            while (!this.shutdown) {
                try {
                    AgentSenderCallback poll = this.resendQueue.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        sendBatchWithRetryCount(poll.message, poll.retry + 1);
                    }
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                } catch (Throwable th) {
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                } finally {
                    AgentUtils.silenceSleepInMs(this.batchFlushInterval);
                }
            }
            LOGGER.info("stop flush resend queue {}:{}", this.inlongGroupId, this.sourcePath);
            this.resendRunning = false;
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putInResendQueue(AgentSenderCallback agentSenderCallback) {
        try {
            this.resendQueue.put(agentSenderCallback);
        } catch (Throwable th) {
            LOGGER.error("putInResendQueue e = {}", th);
        }
    }

    public boolean sendFinished() {
        return true;
    }
}
