package org.apache.inlong.dataproxy.sink.pulsarzone;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.SinkContext;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/pulsarzone/PulsarZoneSinkContext.class */
public class PulsarZoneSinkContext extends SinkContext {
    public static final String KEY_NODE_ID = "nodeId";
    public static final String PREFIX_PRODUCER = "producer.";
    public static final String KEY_COMPRESS_TYPE = "compressType";
    private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
    private final String nodeId;
    private final Context producerContext;
    private final IdTopicConfigHolder idTopicHolder;
    private final CacheClusterConfigHolder cacheHolder;
    private final ProxySdk.INLONG_COMPRESSED_TYPE compressType;

    public PulsarZoneSinkContext(String str, Context context, Channel channel, LinkedBlockingQueue<DispatchProfile> linkedBlockingQueue) {
        super(str, context, channel);
        this.dispatchQueue = linkedBlockingQueue;
        this.nodeId = CommonPropertiesHolder.getString("nodeId", "127.0.0.1");
        this.compressType = ProxySdk.INLONG_COMPRESSED_TYPE.valueOf(CommonPropertiesHolder.getString("compressType", ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name()));
        this.producerContext = new Context(context.getSubProperties("producer."));
        Context context2 = new Context(CommonPropertiesHolder.get());
        this.idTopicHolder = new IdTopicConfigHolder();
        this.idTopicHolder.configure(context2);
        this.cacheHolder = new CacheClusterConfigHolder();
        this.cacheHolder.configure(context2);
    }

    @Override // org.apache.inlong.dataproxy.sink.SinkContext
    public void start() {
        super.start();
        this.idTopicHolder.start();
        this.cacheHolder.start();
    }

    @Override // org.apache.inlong.dataproxy.sink.SinkContext
    public void close() {
        super.close();
        this.idTopicHolder.close();
        this.cacheHolder.close();
    }

    public LinkedBlockingQueue<DispatchProfile> getDispatchQueue() {
        return this.dispatchQueue;
    }

    public Context getProducerContext() {
        return this.producerContext;
    }

    public IdTopicConfigHolder getIdTopicHolder() {
        return this.idTopicHolder;
    }

    public CacheClusterConfigHolder getCacheHolder() {
        return this.cacheHolder;
    }

    public ProxySdk.INLONG_COMPRESSED_TYPE getCompressType() {
        return this.compressType;
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public void addSendMetric(DispatchProfile dispatchProfile, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        fillInlongId(dispatchProfile, hashMap);
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, getSinkName());
        hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, str);
        long dispatchTime = dispatchProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap);
        long count = dispatchProfile.getCount();
        long size = dispatchProfile.getSize();
        dataProxyMetricItem.sendCount.addAndGet(count);
        dataProxyMetricItem.sendSize.addAndGet(size);
    }

    public void addSendFailMetric() {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, getSinkName());
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        ((DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap)).readFailCount.incrementAndGet();
    }

    public static void fillInlongId(DispatchProfile dispatchProfile, Map<String, String> map) {
        String inlongGroupId = dispatchProfile.getInlongGroupId();
        String str = StringUtils.isBlank(inlongGroupId) ? "-" : inlongGroupId;
        String inlongStreamId = dispatchProfile.getInlongStreamId();
        String str2 = StringUtils.isBlank(inlongStreamId) ? "-" : inlongStreamId;
        map.put("inlongGroupId", str);
        map.put("inlongStreamId", str2);
    }

    public void processSendFail(DispatchProfile dispatchProfile, String str, long j) {
        if (!dispatchProfile.isResend()) {
            dispatchProfile.fail();
        } else {
            this.dispatchQueue.offer(dispatchProfile);
            addSendResultMetric(dispatchProfile, str, false, j);
        }
    }

    public void addSendResultMetric(DispatchProfile dispatchProfile, String str, boolean z, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        fillInlongId(dispatchProfile, hashMap);
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, getSinkName());
        hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, str);
        long dispatchTime = dispatchProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap);
        long count = dispatchProfile.getCount();
        long size = dispatchProfile.getSize();
        if (!z) {
            dataProxyMetricItem.sendFailCount.addAndGet(count);
            dataProxyMetricItem.sendFailSize.addAndGet(size);
            return;
        }
        dataProxyMetricItem.sendSuccessCount.addAndGet(count);
        dataProxyMetricItem.sendSuccessSize.addAndGet(size);
        dispatchProfile.getEvents().forEach(proxyEvent -> {
            AuditUtils.add(5, proxyEvent);
        });
        if (j > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            dispatchProfile.getEvents().forEach(proxyEvent2 -> {
                long sourceTime = currentTimeMillis - proxyEvent2.getSourceTime();
                long msgTime = currentTimeMillis - proxyEvent2.getMsgTime();
                dataProxyMetricItem.sinkDuration.addAndGet(currentTimeMillis - j);
                dataProxyMetricItem.nodeDuration.addAndGet(sourceTime);
                dataProxyMetricItem.wholeDuration.addAndGet(msgTime);
            });
        }
    }
}
