package org.apache.inlong.dataproxy.sink.mq;

import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.conf.Configurable;
import org.apache.inlong.dataproxy.config.holder.CacheClusterConfigHolder;
import org.apache.inlong.dataproxy.config.holder.CommonPropertiesHolder;
import org.apache.inlong.dataproxy.config.holder.IdTopicConfigHolder;
import org.apache.inlong.dataproxy.metrics.DataProxyMetricItem;
import org.apache.inlong.dataproxy.metrics.audit.AuditUtils;
import org.apache.inlong.dataproxy.sink.common.SinkContext;
import org.apache.inlong.dataproxy.utils.BufferQueue;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/MessageQueueZoneSinkContext.class */
public class MessageQueueZoneSinkContext extends SinkContext {
    public static final String KEY_NODE_ID = "nodeId";
    public static final String PREFIX_PRODUCER = "producer.";
    public static final String KEY_COMPRESS_TYPE = "compressType";
    public static final String KEY_CACHE_CLUSTER_SELECTOR = "cacheClusterSelector";
    private final BufferQueue<BatchPackProfile> dispatchQueue;
    private final String proxyClusterId;
    private final String nodeId;
    private final Context producerContext;
    private final IdTopicConfigHolder idTopicHolder;
    private final CacheClusterConfigHolder cacheHolder;
    private final ProxySdk.INLONG_COMPRESSED_TYPE compressType;

    public MessageQueueZoneSinkContext(String str, Context context, Channel channel, BufferQueue<BatchPackProfile> bufferQueue) {
        super(str, context, channel);
        this.dispatchQueue = bufferQueue;
        this.proxyClusterId = CommonPropertiesHolder.getString("proxy.cluster.name");
        this.nodeId = CommonPropertiesHolder.getString(KEY_NODE_ID, "127.0.0.1");
        this.compressType = ProxySdk.INLONG_COMPRESSED_TYPE.valueOf(CommonPropertiesHolder.getString(KEY_COMPRESS_TYPE, ProxySdk.INLONG_COMPRESSED_TYPE.INLONG_SNAPPY.name()));
        this.producerContext = new Context(context.getSubProperties(PREFIX_PRODUCER));
        Context context2 = new Context(CommonPropertiesHolder.get());
        this.idTopicHolder = new IdTopicConfigHolder();
        this.idTopicHolder.configure(context2);
        this.cacheHolder = new CacheClusterConfigHolder();
        this.cacheHolder.configure(context2);
    }

    @Override // org.apache.inlong.dataproxy.sink.common.SinkContext
    public void start() {
        super.start();
        this.idTopicHolder.start();
        this.cacheHolder.start();
    }

    @Override // org.apache.inlong.dataproxy.sink.common.SinkContext
    public void close() {
        super.close();
        this.idTopicHolder.close();
        this.cacheHolder.close();
    }

    public String getProxyClusterId() {
        return this.proxyClusterId;
    }

    public BufferQueue<BatchPackProfile> getDispatchQueue() {
        return this.dispatchQueue;
    }

    public Context getProducerContext() {
        return this.producerContext;
    }

    public IdTopicConfigHolder getIdTopicHolder() {
        return this.idTopicHolder;
    }

    public CacheClusterConfigHolder getCacheHolder() {
        return this.cacheHolder;
    }

    public ProxySdk.INLONG_COMPRESSED_TYPE getCompressType() {
        return this.compressType;
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public void addSendResultMetric(BatchPackProfile batchPackProfile, String str, String str2, boolean z, long j) {
        if (batchPackProfile instanceof SimpleBatchPackProfileV0) {
            AuditUtils.add(6, ((SimpleBatchPackProfileV0) batchPackProfile).getSimpleProfile());
            return;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_ID, "-");
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, "-");
        fillInlongId(batchPackProfile, hashMap);
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, str);
        hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, str2);
        long currentTimeMillis = System.currentTimeMillis();
        batchPackProfile.getEvents().forEach(proxyEvent -> {
            long msgTime = proxyEvent.getMsgTime();
            hashMap.put("msgTime", String.valueOf(msgTime - (msgTime % CommonPropertiesHolder.getAuditFormatInterval())));
            DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap);
            if (!z) {
                dataProxyMetricItem.sendFailCount.addAndGet(1L);
                dataProxyMetricItem.sendFailSize.addAndGet(proxyEvent.getBody().length);
                return;
            }
            dataProxyMetricItem.sendSuccessCount.addAndGet(1L);
            dataProxyMetricItem.sendSuccessSize.addAndGet(proxyEvent.getBody().length);
            if (j > 0) {
                long sourceTime = currentTimeMillis - proxyEvent.getSourceTime();
                dataProxyMetricItem.sinkDuration.addAndGet(currentTimeMillis - j);
                dataProxyMetricItem.nodeDuration.addAndGet(sourceTime);
                dataProxyMetricItem.wholeDuration.addAndGet(currentTimeMillis - msgTime);
            }
            AuditUtils.add(6, proxyEvent);
        });
    }

    public void addSendMetric(BatchPackProfile batchPackProfile, String str, String str2, int i) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_ID, "-");
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, "-");
        fillInlongId(batchPackProfile, hashMap);
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, str);
        hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, str2);
        long dispatchTime = batchPackProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap);
        long count = batchPackProfile.getCount();
        long size = batchPackProfile.getSize();
        dataProxyMetricItem.sendCount.addAndGet(count);
        dataProxyMetricItem.sendSize.addAndGet(size);
        dataProxyMetricItem.sendPackCount.incrementAndGet();
        dataProxyMetricItem.sendPackSize.addAndGet(i);
    }

    public void addSendFailMetric() {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getProxyClusterId());
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_ID, "-");
        hashMap.put(DataProxyMetricItem.KEY_SOURCE_DATA_ID, "-");
        hashMap.put("inlongGroupId", "-");
        hashMap.put("inlongStreamId", "-");
        hashMap.put(DataProxyMetricItem.KEY_SINK_ID, getSinkName());
        hashMap.put(DataProxyMetricItem.KEY_SINK_DATA_ID, "-");
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        DataProxyMetricItem dataProxyMetricItem = (DataProxyMetricItem) getMetricItemSet().findMetricItem(hashMap);
        dataProxyMetricItem.sendFailCount.incrementAndGet();
        dataProxyMetricItem.sendFailSize.incrementAndGet();
    }

    public static void fillInlongId(BatchPackProfile batchPackProfile, Map<String, String> map) {
        String inlongGroupId = batchPackProfile.getInlongGroupId();
        String str = StringUtils.isBlank(inlongGroupId) ? "-" : inlongGroupId;
        String inlongStreamId = batchPackProfile.getInlongStreamId();
        String str2 = StringUtils.isBlank(inlongStreamId) ? "-" : inlongStreamId;
        map.put("inlongGroupId", str);
        map.put("inlongStreamId", str2);
    }

    public void processSendFail(BatchPackProfile batchPackProfile, String str, String str2, long j) {
        if (!batchPackProfile.isResend()) {
            batchPackProfile.fail();
        } else {
            this.dispatchQueue.offer(batchPackProfile);
            addSendResultMetric(batchPackProfile, str, str2, false, j);
        }
    }

    public CacheClusterSelector createCacheClusterSelector() {
        String string = CommonPropertiesHolder.getString(KEY_CACHE_CLUSTER_SELECTOR, AllCacheClusterSelector.class.getName());
        try {
            Object newInstance = ClassUtils.getClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof Configurable) {
                ((Configurable) newInstance).configure(new Context(CommonPropertiesHolder.get()));
            }
            if (newInstance instanceof CacheClusterSelector) {
                return (CacheClusterSelector) newInstance;
            }
            return null;
        } catch (Throwable th) {
            LOG.error("Fail to init CacheClusterSelector,selectorClass:{},error:{}", new Object[]{string, th.getMessage(), th});
            return null;
        }
    }
}
