package org.apache.inlong.agent.plugin.sinks;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.core.task.PositionManager;
import org.apache.inlong.agent.message.BatchProxyMessage;
import org.apache.inlong.agent.metrics.AgentMetricItem;
import org.apache.inlong.agent.metrics.AgentMetricItemSet;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ThreadUtils;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/SenderManager.class */
public class SenderManager {
    private DefaultMessageSender sender;
    private LinkedBlockingQueue<AgentSenderCallback> resendQueue;
    private ThreadFactory SHARED_FACTORY;
    private final String managerHost;
    private final int managerPort;
    private final String netTag;
    private final String localhost;
    private final boolean isLocalVisit;
    private final int totalAsyncBufSize;
    private final int aliveConnectionNum;
    private final boolean isCompress;
    private final int msgType;
    private final boolean isFile;
    private final long maxSenderTimeout;
    private final int maxSenderRetry;
    private final long retrySleepTime;
    private final String inlongGroupId;
    private final int maxSenderPerGroup;
    private final String sourcePath;
    private final boolean proxySend;
    private AgentMetricItemSet metricItemSet;
    private Map<String, String> dimensions;
    private PositionManager positionManager;
    private int ioThreadNum;
    private boolean enableBusyWait;
    private String authSecretId;
    private String authSecretKey;
    protected int batchFlushInterval;
    private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
    private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
    private static final AtomicLong METRIC_INDEX = new AtomicLong(0);
    private final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
    private final ExecutorService resendExecutorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new AgentThreadFactory("SendManager-Resend"));
    private volatile boolean shutdown = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/SenderManager$AgentSenderCallback.class */
    public class AgentSenderCallback implements SendMessageCallback {
        private final int retry;
        private final BatchProxyMessage batchMessage;
        private final int msgCnt;

        AgentSenderCallback(BatchProxyMessage batchProxyMessage, int i) {
            this.batchMessage = batchProxyMessage;
            this.retry = i;
            this.msgCnt = batchProxyMessage.getDataList().size();
        }

        public void onMessageAck(SendResult sendResult) {
            String groupId = this.batchMessage.getGroupId();
            String streamId = this.batchMessage.getStreamId();
            String jobId = this.batchMessage.getJobId();
            long dataTime = this.batchMessage.getDataTime();
            if (sendResult != null && sendResult.equals(SendResult.OK)) {
                SenderManager.this.getMetricItem(groupId, streamId).pluginSendSuccessCount.addAndGet(this.msgCnt);
                PositionManager.getInstance().updateSinkPosition(this.batchMessage.getJobId(), SenderManager.this.sourcePath, this.msgCnt, false);
            } else {
                SenderManager.LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime {} fail with times {}, error {}", new Object[]{groupId, streamId, jobId, Long.valueOf(dataTime), Integer.valueOf(this.retry), sendResult});
                SenderManager.this.getMetricItem(groupId, streamId).pluginSendFailCount.addAndGet(this.msgCnt);
                SenderManager.this.putInResendQueue(new AgentSenderCallback(this.batchMessage, this.retry));
            }
        }

        public void onException(Throwable th) {
            SenderManager.this.getMetricItem(this.batchMessage.getGroupId(), this.batchMessage.getStreamId()).pluginSendFailCount.addAndGet(this.msgCnt);
            SenderManager.LOGGER.error("exception caught", th);
        }
    }

    public SenderManager(JobProfile jobProfile, String str, String str2) {
        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
        this.managerHost = agentConf.get("agent.manager.vip.http.host");
        this.managerPort = agentConf.getInt("agent.manager.vip.http.port");
        this.proxySend = jobProfile.getBoolean("job.proxySend", false);
        this.localhost = jobProfile.get("proxy.localHost", CommonConstants.DEFAULT_PROXY_LOCALHOST);
        this.netTag = jobProfile.get("proxy.net.tag", "");
        this.isLocalVisit = jobProfile.getBoolean("proxy.isLocalVisit", true);
        this.totalAsyncBufSize = jobProfile.getInt("proxy.total.async.proxy.size", 209715200);
        this.aliveConnectionNum = jobProfile.getInt("proxy.alive.connection.num", 10);
        this.isCompress = jobProfile.getBoolean("proxy.is.compress", true);
        this.maxSenderPerGroup = jobProfile.getInt("proxy.max.sender.per.group", 10);
        this.msgType = jobProfile.getInt("proxy.msgType", 7);
        this.maxSenderTimeout = jobProfile.getInt("proxy.sender.maxTimeout", 20);
        this.maxSenderRetry = jobProfile.getInt("proxy.sender.maxRetry", 5);
        this.retrySleepTime = jobProfile.getLong("proxy.retry.sleep", 500L);
        this.isFile = jobProfile.getBoolean("proxy.isFile", false);
        this.positionManager = PositionManager.getInstance();
        this.ioThreadNum = jobProfile.getInt("client.iothread.num", CommonConstants.DEFAULT_PROXY_CLIENT_IO_THREAD_NUM);
        this.enableBusyWait = jobProfile.getBoolean("client.enable.busy.wait", false);
        this.batchFlushInterval = jobProfile.getInt("proxy.batch.flush.interval", 100);
        this.authSecretId = agentConf.get("agent.manager.auth.secretId");
        this.authSecretKey = agentConf.get("agent.manager.auth.secretKey");
        this.sourcePath = str2;
        this.inlongGroupId = str;
        this.dimensions = new HashMap();
        this.dimensions.put("pluginId", getClass().getSimpleName());
        this.metricItemSet = new AgentMetricItemSet(String.join("-", getClass().getSimpleName(), String.valueOf(METRIC_INDEX.incrementAndGet())));
        MetricRegister.register(this.metricItemSet);
        this.resendQueue = new LinkedBlockingQueue<>();
    }

    public void Start() throws Exception {
        this.sender = createMessageSender(this.inlongGroupId);
        this.resendExecutorService.execute(flushResendQueue());
    }

    public void Stop() {
        this.shutdown = true;
        this.resendExecutorService.shutdown();
        this.sender.close();
        cleanResendQueue();
    }

    private void cleanResendQueue() {
        while (!this.resendQueue.isEmpty()) {
            try {
                this.resendQueue.poll(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOGGER.error("clean resend queue error{}", e.getMessage());
            }
        }
    }

    private AgentMetricItem getMetricItem(Map<String, String> map) {
        HashMap hashMap = new HashMap();
        hashMap.put("pluginId", getClass().getSimpleName());
        hashMap.putAll(map);
        return this.metricItemSet.findMetricItem(hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public AgentMetricItem getMetricItem(String str, String str2) {
        HashMap hashMap = new HashMap();
        hashMap.put("inlongGroupId", str);
        hashMap.put("inlongStreamId", str2);
        return getMetricItem(hashMap);
    }

    private DefaultMessageSender createMessageSender(String str) throws Exception {
        ProxyClientConfig proxyClientConfig = new ProxyClientConfig(this.localhost, this.isLocalVisit, this.managerHost, this.managerPort, str, this.authSecretId, this.authSecretKey);
        proxyClientConfig.setTotalAsyncCallbackSize(this.totalAsyncBufSize);
        proxyClientConfig.setFile(this.isFile);
        proxyClientConfig.setAliveConnections(this.aliveConnectionNum);
        proxyClientConfig.setIoThreadNum(this.ioThreadNum);
        proxyClientConfig.setEnableBusyWait(this.enableBusyWait);
        proxyClientConfig.setProtocolType("TCP");
        this.SHARED_FACTORY = new DefaultThreadFactory("agent-client-" + this.sourcePath, Thread.currentThread().isDaemon());
        DefaultMessageSender defaultMessageSender = new DefaultMessageSender(proxyClientConfig, this.SHARED_FACTORY);
        defaultMessageSender.setMsgtype(this.msgType);
        defaultMessageSender.setCompress(this.isCompress);
        return defaultMessageSender;
    }

    public void sendBatch(BatchProxyMessage batchProxyMessage) {
        sendBatchWithRetryCount(batchProxyMessage, 0);
    }

    private void sendBatchWithRetryCount(BatchProxyMessage batchProxyMessage, int i) {
        boolean z = false;
        while (!z) {
            try {
                if (this.resendQueue.isEmpty()) {
                    this.sender.asyncSendMessage(new AgentSenderCallback(batchProxyMessage, i), batchProxyMessage.getDataList(), batchProxyMessage.getGroupId(), batchProxyMessage.getStreamId(), batchProxyMessage.getDataTime(), SEQUENTIAL_ID.getNextUuid(), this.maxSenderTimeout, TimeUnit.SECONDS, batchProxyMessage.getExtraMap(), this.proxySend);
                    getMetricItem(batchProxyMessage.getGroupId(), batchProxyMessage.getStreamId()).pluginSendCount.addAndGet(batchProxyMessage.getMsgCnt());
                    z = true;
                } else {
                    AgentUtils.silenceSleepInMs(this.retrySleepTime);
                }
            } catch (Exception e) {
                z = false;
                if (i <= this.maxSenderRetry) {
                    LOGGER.error("Exception caught", e);
                } else if (i % 10 == 0) {
                    LOGGER.error("max retry reached, sample log Exception caught", e);
                }
                i++;
                AgentUtils.silenceSleepInMs(this.retrySleepTime);
            }
        }
    }

    private Runnable flushResendQueue() {
        return () -> {
            LOGGER.info("start flush resend queue {}:{}", this.inlongGroupId, this.sourcePath);
            while (!this.shutdown) {
                try {
                    AgentSenderCallback poll = this.resendQueue.poll(1L, TimeUnit.SECONDS);
                    if (poll != null) {
                        sendBatchWithRetryCount(poll.batchMessage, poll.retry + 1);
                    }
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                } catch (Throwable th) {
                    ThreadUtils.threadThrowableHandler(Thread.currentThread(), th);
                } finally {
                    AgentUtils.silenceSleepInMs(this.batchFlushInterval);
                }
            }
            LOGGER.info("stop flush resend queue {}:{}", this.inlongGroupId, this.sourcePath);
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void putInResendQueue(AgentSenderCallback agentSenderCallback) {
        try {
            this.resendQueue.put(agentSenderCallback);
        } catch (Throwable th) {
            LOGGER.error("putInResendQueue e = {}", th);
        }
    }
}
