package org.apache.inlong.agent.plugin.sinks;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.agent.common.AgentThreadFactory;
import org.apache.inlong.agent.conf.JobProfile;
import org.apache.inlong.agent.message.EndMessage;
import org.apache.inlong.agent.message.ProxyMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.MessageFilter;
import org.apache.inlong.agent.plugin.fetcher.constants.CommandConstants;
import org.apache.inlong.agent.plugin.message.PackProxyMessage;
import org.apache.inlong.agent.plugin.metrics.SinkJmxMetric;
import org.apache.inlong.agent.plugin.metrics.SinkMetrics;
import org.apache.inlong.agent.plugin.metrics.SinkPrometheusMetrics;
import org.apache.inlong.agent.utils.AgentUtils;
import org.apache.inlong.agent.utils.ConfigUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sinks/ProxySink.class */
public class ProxySink extends AbstractSink {
    private static final String PROXY_SINK_TAG_NAME = "AgentProxySinkMetric";
    private MessageFilter messageFilter;
    private SenderManager senderManager;
    private byte[] fieldSplitter;
    private String inlongGroupId;
    private String inlongStreamId;
    private String sourceName;
    private String jobInstanceId;
    private int maxBatchSize;
    private int maxBatchTimeoutMs;
    private int batchFlushInterval;
    private int maxQueueNumber;
    private final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, (BlockingQueue<Runnable>) new LinkedBlockingQueue(), (ThreadFactory) new AgentThreadFactory("ProxySink"));
    private volatile boolean shutdown = false;
    private ConcurrentHashMap<String, PackProxyMessage> cache;
    private long dataTime;
    private final SinkMetrics sinkMetrics;
    private static final Logger LOGGER = LoggerFactory.getLogger(ProxySink.class);
    private static AtomicLong index = new AtomicLong(0);

    public ProxySink() {
        if (ConfigUtil.isPrometheusEnabled()) {
            this.sinkMetrics = new SinkPrometheusMetrics(AgentUtils.getUniqId(PROXY_SINK_TAG_NAME, index.incrementAndGet()));
        } else {
            this.sinkMetrics = new SinkJmxMetric(AgentUtils.getUniqId(PROXY_SINK_TAG_NAME, index.incrementAndGet()));
        }
    }

    public void write(Message message) {
        if (message != null) {
            message.getHeader().put("inlongGroupId", this.inlongGroupId);
            message.getHeader().put("inlongStreamId", this.inlongStreamId);
            extractStreamFromMessage(message, this.fieldSplitter);
            if (message instanceof EndMessage) {
                this.sinkMetrics.incSinkFailCount();
                return;
            }
            ProxyMessage parse = ProxyMessage.parse(message);
            this.cache.compute(parse.getInlongStreamId(), (str, packProxyMessage) -> {
                if (packProxyMessage == null) {
                    packProxyMessage = new PackProxyMessage(this.maxBatchSize, this.maxQueueNumber, this.maxBatchTimeoutMs, parse.getInlongStreamId());
                }
                packProxyMessage.addProxyMessage(parse);
                return packProxyMessage;
            });
            AuditUtils.add(4, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis());
            this.sinkMetrics.incSinkSuccessCount();
        }
    }

    private void extractStreamFromMessage(Message message, byte[] bArr) {
        if (this.messageFilter != null) {
            message.getHeader().put("inlongStreamId", this.messageFilter.filterStreamId(message, bArr));
        } else {
            message.getHeader().put("inlongStreamId", this.inlongStreamId);
        }
    }

    public void setSourceName(String str) {
        this.sourceName = str;
    }

    private Runnable flushCache() {
        return () -> {
            LOGGER.info("start flush cache thread for {} ProxySink", this.inlongGroupId);
            while (!this.shutdown) {
                try {
                    this.cache.forEach((str, packProxyMessage) -> {
                        Pair<String, List<byte[]>> fetchBatch = packProxyMessage.fetchBatch();
                        if (fetchBatch != null) {
                            this.senderManager.sendBatch(this.jobInstanceId, this.inlongGroupId, (String) fetchBatch.getKey(), (List) fetchBatch.getValue(), 0, this.dataTime);
                            LOGGER.info("send group id {} with message size {}, the job id is {}, read source is {}dataTime is {}", new Object[]{this.inlongGroupId, Integer.valueOf(((List) fetchBatch.getRight()).size()), this.jobInstanceId, this.sourceName, Long.valueOf(this.dataTime)});
                        }
                    });
                    AgentUtils.silenceSleepInMs(this.batchFlushInterval);
                } catch (Exception e) {
                    LOGGER.error("error caught", e);
                }
            }
        };
    }

    public void init(JobProfile jobProfile) {
        this.maxBatchSize = jobProfile.getInt("proxy.package.maxSize", 200000);
        this.maxQueueNumber = jobProfile.getInt("proxy.group.queue.maxNumber", 10000);
        this.maxBatchTimeoutMs = jobProfile.getInt("proxy.package.maxTimeout.ms", 4000);
        this.jobInstanceId = jobProfile.get("job.instance.id");
        this.batchFlushInterval = jobProfile.getInt("proxy.batch.flush.interval", 2000);
        this.cache = new ConcurrentHashMap<>(10);
        this.dataTime = AgentUtils.timeStrConvertToMillSec(jobProfile.get("job.dataTime", ""), jobProfile.get("job.cycleUnit", ""));
        this.inlongGroupId = jobProfile.get("proxy.inlongGroupId");
        this.inlongStreamId = jobProfile.get("proxy.inlongStreamId", "");
        this.messageFilter = initMessageFilter(jobProfile);
        this.fieldSplitter = jobProfile.get("proxy.field.splitter", "|").getBytes(StandardCharsets.UTF_8);
        this.executorService.execute(flushCache());
        this.senderManager = new SenderManager(jobProfile, this.inlongGroupId, this.sourceName);
        try {
            this.senderManager.addMessageSender();
        } catch (Exception e) {
            LOGGER.error("error while init sender for group id {}", this.inlongGroupId);
            throw new IllegalStateException(e);
        }
    }

    private HashMap<String, String> parseAttrFromJobProfile(JobProfile jobProfile) {
        HashMap<String, String> hashMap = new HashMap<>();
        String str = jobProfile.get("job.additionStr", "");
        if (!str.isEmpty()) {
            hashMap.putAll(AgentUtils.getAdditionAttr(str));
        }
        if (jobProfile.getBoolean("job.retry", false)) {
            hashMap.put("f", "bl");
        }
        hashMap.put(CommandConstants.ID, jobProfile.get("job.id"));
        hashMap.put("agentip", jobProfile.get("job.ip"));
        return hashMap;
    }

    public void destroy() {
        LOGGER.info("destroy sink which sink from source name {}", this.sourceName);
        while (!sinkFinish()) {
            LOGGER.info("job {} wait until cache all flushed to proxy", this.jobInstanceId);
            AgentUtils.silenceSleepInMs(this.batchFlushInterval);
        }
        this.shutdown = true;
        this.executorService.shutdown();
    }

    private boolean sinkFinish() {
        return this.cache.values().stream().allMatch((v0) -> {
            return v0.isEmpty();
        });
    }
}
