package org.apache.inlong.dataproxy.sink.mqzone;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
import org.apache.inlong.dataproxy.dispatch.DispatchManager;
import org.apache.inlong.dataproxy.dispatch.DispatchProfile;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItemSet;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mqzone/AbstractZoneSinkContext.class */
public abstract class AbstractZoneSinkContext {
    public static final Logger LOG = LoggerFactory.getLogger(AbstractZoneSinkContext.class);
    public static final String KEY_MAX_THREADS = "maxThreads";
    public static final String KEY_PROCESS_INTERVAL = "processInterval";
    public static final String KEY_RELOAD_INTERVAL = "reloadInterval";
    protected final String sinkName;
    protected final Context sinkContext;
    protected final Channel channel;
    protected final int maxThreads;
    protected final long processInterval;
    protected final long reloadInterval;
    protected final DataProxyMetricItemSet metricItemSet;
    protected Timer reloadTimer;
    public static final String KEY_NODE_ID = "nodeId";
    public static final String PREFIX_PRODUCER = "producer.";
    public static final String KEY_COMPRESS_TYPE = "compressType";
    protected ArrayList<LinkedBlockingQueue<DispatchProfile>> dispatchQueues;
    protected final String proxyClusterId;
    protected final String nodeId;
    protected final Context producerContext;
    protected final IdTopicConfigHolder idTopicHolder;
    protected final CacheClusterConfigHolder cacheHolder;
    protected final ProxySdk.INLONG_COMPRESSED_TYPE compressType;

    public AbstractZoneSinkContext(String str, Context context, Channel channel, ArrayList<LinkedBlockingQueue<DispatchProfile>> arrayList) {
        this.dispatchQueues = new ArrayList<>();
        this.sinkName = str;
        this.sinkContext = context;
        this.channel = channel;
        this.maxThreads = this.sinkContext.getInteger("maxThreads", 10).intValue();
        this.processInterval = this.sinkContext.getInteger("processInterval", 100).intValue();
        this.reloadInterval = this.sinkContext.getLong("reloadInterval", Long.valueOf(DispatchManager.MINUTE_MS)).longValue();
        this.metricItemSet = new DataProxyMetricItemSet(str);
        MetricRegister.register(this.metricItemSet);
        this.dispatchQueues = arrayList;
        this.proxyClusterId = CommonPropertiesHolder.getString("proxy.cluster.name");
        this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
        this.compressType = ProxySdk.INLONG_COMPRESSED_TYPE.valueOf(CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE, ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name()));
        this.producerContext = new Context(context.getSubProperties("producer."));
        Context context2 = new Context(CommonPropertiesHolder.get());
        this.idTopicHolder = new IdTopicConfigHolder();
        this.idTopicHolder.configure(context2);
        this.cacheHolder = new CacheClusterConfigHolder();
        this.cacheHolder.configure(context2);
    }

    public void start() {
        try {
            reload();
            setReloadTimer();
            this.idTopicHolder.start();
            this.cacheHolder.start();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void close() {
        try {
            this.reloadTimer.cancel();
            this.idTopicHolder.close();
            this.cacheHolder.close();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    protected void setReloadTimer() {
        this.reloadTimer = new Timer(true);
        this.reloadTimer.schedule(new TimerTask() { // from class: org.apache.inlong.dataproxy.sink.mqzone.AbstractZoneSinkContext.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                AbstractZoneSinkContext.this.reload();
            }
        }, new Date(System.currentTimeMillis() + this.reloadInterval), this.reloadInterval);
    }

    public void reload() {
    }

    public String getSinkName() {
        return this.sinkName;
    }

    public Context getSinkContext() {
        return this.sinkContext;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public int getMaxThreads() {
        return this.maxThreads;
    }

    public long getProcessInterval() {
        return this.processInterval;
    }

    public long getReloadInterval() {
        return this.reloadInterval;
    }

    public DataProxyMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }

    public String getProxyClusterId() {
        return this.proxyClusterId;
    }

    public Context getProducerContext() {
        return this.producerContext;
    }

    public IdTopicConfigHolder getIdTopicHolder() {
        return this.idTopicHolder;
    }

    public CacheClusterConfigHolder getCacheHolder() {
        return this.cacheHolder;
    }

    public ProxySdk.INLONG_COMPRESSED_TYPE getCompressType() {
        return this.compressType;
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public static void fillInlongId(DispatchProfile dispatchProfile, Map<String, String> map) {
        String inlongGroupId = dispatchProfile.getInlongGroupId();
        String str = StringUtils.isBlank(inlongGroupId) ? "-" : inlongGroupId;
        String inlongStreamId = dispatchProfile.getInlongStreamId();
        String str2 = StringUtils.isBlank(inlongStreamId) ? "-" : inlongStreamId;
        map.put("inlongGroupId", str);
        map.put("inlongStreamId", str2);
    }

    public void addSendResultMetric(DispatchProfile dispatchProfile, String str, boolean z, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        fillInlongId(dispatchProfile, hashMap);
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, getSinkName());
        hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, str);
        long dispatchTime = dispatchProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap);
        long count = dispatchProfile.getCount();
        long size = dispatchProfile.getSize();
        if (!z) {
            dataProxyMetricItem.sendFailCount.addAndGet(count);
            dataProxyMetricItem.sendFailSize.addAndGet(size);
            return;
        }
        dataProxyMetricItem.sendSuccessCount.addAndGet(count);
        dataProxyMetricItem.sendSuccessSize.addAndGet(size);
        dispatchProfile.getEvents().forEach(proxyEvent -> {
            AuditUtils.add(6, proxyEvent);
        });
        if (j > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            dispatchProfile.getEvents().forEach(proxyEvent2 -> {
                long sourceTime = currentTimeMillis - proxyEvent2.getSourceTime();
                long msgTime = currentTimeMillis - proxyEvent2.getMsgTime();
                dataProxyMetricItem.sinkDuration.addAndGet(currentTimeMillis - j);
                dataProxyMetricItem.nodeDuration.addAndGet(sourceTime);
                dataProxyMetricItem.wholeDuration.addAndGet(msgTime);
            });
        }
    }

    public ArrayList<LinkedBlockingQueue<DispatchProfile>> getDispatchQueues() {
        return this.dispatchQueues;
    }

    public void setDispatchQueues(ArrayList<LinkedBlockingQueue<DispatchProfile>> arrayList) {
        this.dispatchQueues = arrayList;
    }

    public void processSendFail(DispatchProfile dispatchProfile, String str, long j) {
        if (!dispatchProfile.isResend()) {
            dispatchProfile.fail();
        } else {
            this.dispatchQueues.get(dispatchProfile.getSendIndex() % this.maxThreads).offer(dispatchProfile);
            addSendResultMetric(dispatchProfile, str, false, j);
        }
    }

    public void addSendFailMetric() {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, getSinkName());
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        ((DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap)).sendFailCount.incrementAndGet();
    }

    public void addSendMetric(DispatchProfile dispatchProfile, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        fillInlongId(dispatchProfile, hashMap);
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, getSinkName());
        hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, str);
        long dispatchTime = dispatchProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap);
        long count = dispatchProfile.getCount();
        long size = dispatchProfile.getSize();
        dataProxyMetricItem.sendCount.addAndGet(count);
        dataProxyMetricItem.sendSize.addAndGet(size);
    }
}
