package org.apache.inlong.agent.plugin.sinks;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.constants.CommonConstants;
import org.apache.inlong.agent.core.task.TaskPositionManager;
import org.apache.inlong.agent.plugin.fetcher.constants.FetcherConstants;
import org.apache.inlong.agent.plugin.message.SequentialID;
import org.apache.inlong.agent.plugin.metrics.PluginJmxMetric;
import org.apache.inlong.agent.plugin.metrics.PluginMetric;
import org.apache.inlong.agent.plugin.metrics.PluginPrometheusMetric;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ConfigUtil;
import org.apache.inlong.sdk.dataproxy.DefaultMessageSender;
import org.apache.inlong.sdk.dataproxy.ProxyClientConfig;
import org.apache.inlong.sdk.dataproxy.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.SendResult;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/SenderManager.class */
public class SenderManager {
    private static final String SENDER_MANAGER_TAG_NAME = "AgentSenderManager";
    private final String managerHost;
    private final int managerPort;
    private final String netTag;
    private final String localhost;
    private final boolean isLocalVisit;
    private final int totalAsyncBufSize;
    private final int aliveConnectionNum;
    private final boolean isCompress;
    private final int msgType;
    private final boolean isFile;
    private final long maxSenderTimeout;
    private final int maxSenderRetry;
    private final long retrySleepTime;
    private final String inlongGroupId;
    private TaskPositionManager taskPositionManager;
    private final int maxSenderPerGroup;
    private final String sourcePath;
    private final PluginMetric metric;
    private static final Logger LOGGER = LoggerFactory.getLogger(SenderManager.class);
    private static final SequentialID SEQUENTIAL_ID = SequentialID.getInstance();
    private static final AtomicInteger SENDER_INDEX = new AtomicInteger(0);
    private static final ConcurrentHashMap<String, List<DefaultMessageSender>> SENDER_MAP = new ConcurrentHashMap<>();
    private static final NioClientSocketChannelFactory SHARED_FACTORY = new NioClientSocketChannelFactory(new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("SenderManager")), new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, (BlockingQueue<Runnable>) new SynchronousQueue(), (ThreadFactory) new AgentThreadFactory("SenderManager")));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/SenderManager$AgentSenderCallback.class */
    public class AgentSenderCallback implements SendMessageCallback {
        private final int retry;
        private final String groupId;
        private final List<byte[]> bodyList;
        private final String streamId;
        private final long dataTime;
        private final String jobId;

        AgentSenderCallback(String str, String str2, String str3, List<byte[]> list, int i, long j) {
            this.retry = i;
            this.groupId = str2;
            this.streamId = str3;
            this.bodyList = list;
            this.jobId = str;
            this.dataTime = j;
        }

        public void onMessageAck(SendResult sendResult) {
            if (sendResult == null || !sendResult.equals(SendResult.OK)) {
                SenderManager.LOGGER.warn("send groupId {}, streamId {}, jobId {}, dataTime {} fail with times {}", new Object[]{this.groupId, this.streamId, this.jobId, Long.valueOf(this.dataTime), Integer.valueOf(this.retry)});
                SenderManager.this.sendBatch(this.jobId, this.groupId, this.streamId, this.bodyList, this.retry + 1, this.dataTime);
            } else {
                SenderManager.this.metric.incSendSuccessNum(this.bodyList.size());
                SenderManager.this.taskPositionManager.updateSinkPosition(this.jobId, SenderManager.this.sourcePath, this.bodyList.size());
            }
        }

        public void onException(Throwable th) {
            SenderManager.LOGGER.error("exception caught", th);
        }
    }

    public SenderManager(JobProfile jobProfile, String str, String str2) {
        AgentConfiguration agentConf = AgentConfiguration.getAgentConf();
        this.managerHost = agentConf.get(FetcherConstants.AGENT_MANAGER_VIP_HTTP_HOST);
        this.managerPort = agentConf.getInt(FetcherConstants.AGENT_MANAGER_VIP_HTTP_PORT);
        this.localhost = jobProfile.get("proxy.localHost", CommonConstants.DEFAULT_PROXY_LOCALHOST);
        this.netTag = jobProfile.get("proxy.net.tag", "");
        this.isLocalVisit = jobProfile.getBoolean("proxy.isLocalVisit", true);
        this.totalAsyncBufSize = jobProfile.getInt("proxy.total.async.proxy.size", 209715200);
        this.aliveConnectionNum = jobProfile.getInt("proxy.alive.connection.num", 10);
        this.isCompress = jobProfile.getBoolean("proxy.is.compress", true);
        this.maxSenderPerGroup = jobProfile.getInt("proxy.max.sender.per.group", 10);
        this.msgType = jobProfile.getInt("proxy.msgType", 7);
        this.maxSenderTimeout = jobProfile.getInt("proxy.sender.maxTimeout", 20);
        this.maxSenderRetry = jobProfile.getInt("proxy.sender.maxRetry", 5);
        this.retrySleepTime = jobProfile.getLong("proxy.retry.sleep", 500L);
        this.isFile = jobProfile.getBoolean("proxy.isFile", false);
        this.taskPositionManager = TaskPositionManager.getTaskPositionManager();
        this.sourcePath = str2;
        this.inlongGroupId = str;
        if (ConfigUtil.isPrometheusEnabled()) {
            this.metric = new PluginPrometheusMetric(SENDER_MANAGER_TAG_NAME);
        } else {
            this.metric = new PluginJmxMetric(SENDER_MANAGER_TAG_NAME);
        }
    }

    private DefaultMessageSender selectSender(String str) {
        List<DefaultMessageSender> list = SENDER_MAP.get(str);
        return list.get((SENDER_INDEX.getAndIncrement() & Integer.MAX_VALUE) % list.size());
    }

    private DefaultMessageSender createMessageSender(String str) throws Exception {
        ProxyClientConfig proxyClientConfig = new ProxyClientConfig(this.localhost, this.isLocalVisit, this.managerHost, this.managerPort, str, this.netTag);
        proxyClientConfig.setTotalAsyncCallbackSize(this.totalAsyncBufSize);
        proxyClientConfig.setFile(this.isFile);
        proxyClientConfig.setAliveConnections(this.aliveConnectionNum);
        DefaultMessageSender defaultMessageSender = new DefaultMessageSender(proxyClientConfig, SHARED_FACTORY);
        defaultMessageSender.setMsgtype(this.msgType);
        defaultMessageSender.setCompress(this.isCompress);
        return defaultMessageSender;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.util.List] */
    public void addMessageSender() throws Exception {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = (List) SENDER_MAP.putIfAbsent(this.inlongGroupId, arrayList);
        if (arrayList2 == null) {
            arrayList2 = arrayList;
        }
        if (arrayList2.size() > this.maxSenderPerGroup) {
            return;
        }
        arrayList2.add(createMessageSender(this.inlongGroupId));
    }

    public void sendBatch(String str, String str2, String str3, List<byte[]> list, int i, long j) {
        if (i > this.maxSenderRetry) {
            LOGGER.warn("max retry reached, retry count is {}, sleep and send again", Integer.valueOf(i));
            AgentUtils.silenceSleepInMs(this.retrySleepTime);
        }
        try {
            selectSender(str2).asyncSendMessage(new AgentSenderCallback(str, str2, str3, list, i, j), list, str2, str3, j, SEQUENTIAL_ID.getNextUuid(), this.maxSenderTimeout, TimeUnit.SECONDS);
        } catch (Exception e) {
            LOGGER.error("Exception caught", e);
            try {
                TimeUnit.SECONDS.sleep(1L);
                sendBatch(str, str2, str3, list, i + 1, j);
            } catch (Exception e2) {
            }
        }
    }
}
