package org.apache.inlong.sort.standalone.sink.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ClassUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.node.KafkaNodeConfig;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.CacheClusterConfig;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortConfigMetricReporter;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/kafka/KafkaFederationSinkContext.class */
public class KafkaFederationSinkContext extends SinkContext {
    public static final Logger LOG = InlongLoggerFactory.getLogger(KafkaFederationSinkContext.class);
    public static final String KEY_EVENT_HANDLER = "eventHandler";
    private KafkaNodeConfig kafkaNodeConfig;
    private CacheClusterConfig cacheClusterConfig;
    private Map<String, KafkaIdConfig> idConfigMap;

    public KafkaFederationSinkContext(String str, Context context, Channel channel) {
        super(str, context, channel);
        this.idConfigMap = new ConcurrentHashMap();
    }

    @Override // org.apache.inlong.sort.standalone.sink.SinkContext
    public void reload() {
        LOG.info("reload KafkaFederationSinkContext.");
        try {
            TaskConfig taskConfig = SortConfigHolder.getTaskConfig(this.taskName);
            SortTaskConfig taskConfig2 = SortClusterConfigHolder.getTaskConfig(this.taskName);
            if (taskConfig == null && taskConfig2 == null) {
                LOG.error("newSortTaskConfig is null.");
                return;
            }
            if (this.taskConfig != null && this.taskConfig.equals(taskConfig) && this.sortTaskConfig != null && this.sortTaskConfig.equals(taskConfig2)) {
                LOG.info("Same sortTaskConfig, do nothing.");
                return;
            }
            if (taskConfig != null) {
                KafkaNodeConfig nodeConfig = taskConfig.getNodeConfig();
                if (this.kafkaNodeConfig == null || nodeConfig.getVersion().intValue() > this.kafkaNodeConfig.getVersion().intValue()) {
                    this.kafkaNodeConfig = nodeConfig;
                }
            }
            this.taskConfig = taskConfig;
            this.sortTaskConfig = taskConfig2;
            CacheClusterConfig cacheClusterConfig = new CacheClusterConfig();
            cacheClusterConfig.setClusterName(this.taskName);
            cacheClusterConfig.setParams(this.sortTaskConfig.getSinkParams());
            this.cacheClusterConfig = cacheClusterConfig;
            Map<String, KafkaIdConfig> fromTaskConfig = fromTaskConfig(this.taskConfig);
            Map<String, KafkaIdConfig> fromSortTaskConfig = fromSortTaskConfig(this.sortTaskConfig);
            SortConfigMetricReporter.reportClusterDiff(this.clusterId, this.taskName, fromTaskConfig, fromSortTaskConfig);
            this.idConfigMap = this.unifiedConfiguration ? fromTaskConfig : fromSortTaskConfig;
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    public Map<String, KafkaIdConfig> fromTaskConfig(TaskConfig taskConfig) {
        return taskConfig == null ? new HashMap() : (Map) taskConfig.getClusterTagConfigs().stream().map((v0) -> {
            return v0.getDataFlowConfigs();
        }).flatMap((v0) -> {
            return v0.stream();
        }).map(KafkaIdConfig::create).collect(Collectors.toMap(kafkaIdConfig -> {
            return InlongId.generateUid(kafkaIdConfig.getInlongGroupId(), kafkaIdConfig.getInlongStreamId());
        }, kafkaIdConfig2 -> {
            return kafkaIdConfig2;
        }, (kafkaIdConfig3, kafkaIdConfig4) -> {
            return kafkaIdConfig3;
        }));
    }

    public Map<String, KafkaIdConfig> fromSortTaskConfig(SortTaskConfig sortTaskConfig) {
        if (sortTaskConfig == null) {
            return new HashMap();
        }
        List idParams = sortTaskConfig.getIdParams();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        Iterator it = idParams.iterator();
        while (it.hasNext()) {
            try {
                KafkaIdConfig kafkaIdConfig = new KafkaIdConfig((Map<String, String>) it.next());
                concurrentHashMap.put(kafkaIdConfig.getUid(), kafkaIdConfig);
            } catch (Exception e) {
                LOG.error("fail to parse kafka id config", e);
            }
        }
        return concurrentHashMap;
    }

    public KafkaNodeConfig getNodeConfig() {
        return this.kafkaNodeConfig;
    }

    public CacheClusterConfig getCacheClusterConfig() {
        return this.cacheClusterConfig;
    }

    public String getTopic(String str) {
        KafkaIdConfig kafkaIdConfig = this.idConfigMap.get(str);
        if (Objects.isNull(kafkaIdConfig)) {
            return null;
        }
        return kafkaIdConfig.getTopic();
    }

    public KafkaIdConfig getIdConfig(String str) {
        KafkaIdConfig kafkaIdConfig = this.idConfigMap.get(str);
        if (kafkaIdConfig == null) {
            throw new NullPointerException("uid " + str + "got null KafkaIdConfig");
        }
        return kafkaIdConfig;
    }

    public void addSendMetric(ProfileEvent profileEvent, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put("taskName", getTaskName());
        fillInlongId(profileEvent, hashMap);
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        long rawLogTime = profileEvent.getRawLogTime();
        hashMap.put("msgTime", String.valueOf(rawLogTime - (rawLogTime % CommonPropertiesHolder.getAuditFormatInterval())));
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
        long length = profileEvent.getBody().length;
        findMetricItem.sendCount.addAndGet(1L);
        findMetricItem.sendSize.addAndGet(length);
    }

    public void addSendFailMetric() {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put("sinkId", getSinkName());
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        getMetricItemSet().findMetricItem(hashMap).readFailCount.incrementAndGet();
    }

    public void addSendResultMetric(ProfileEvent profileEvent, String str, boolean z, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put("taskName", getTaskName());
        fillInlongId(profileEvent, hashMap);
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        long rawLogTime = profileEvent.getRawLogTime();
        hashMap.put("msgTime", String.valueOf(rawLogTime - (rawLogTime % CommonPropertiesHolder.getAuditFormatInterval())));
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
        long length = profileEvent.getBody().length;
        if (!z) {
            findMetricItem.sendFailCount.addAndGet(1L);
            findMetricItem.sendFailSize.addAndGet(length);
            return;
        }
        findMetricItem.sendSuccessCount.addAndGet(1L);
        findMetricItem.sendSuccessSize.addAndGet(length);
        AuditUtils.add(8, profileEvent);
        if (j > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            long j2 = currentTimeMillis - j;
            long fetchTime = currentTimeMillis - profileEvent.getFetchTime();
            long rawLogTime2 = currentTimeMillis - profileEvent.getRawLogTime();
            findMetricItem.sinkDuration.addAndGet(j2 * 1);
            findMetricItem.nodeDuration.addAndGet(fetchTime * 1);
            findMetricItem.wholeDuration.addAndGet(rawLogTime2 * 1);
        }
    }

    public IEvent2KafkaRecordHandler createEventHandler() {
        String string = CommonPropertiesHolder.getString("eventHandler", DefaultEvent2KafkaRecordHandler.class.getName());
        try {
            Object newInstance = ClassUtils.getClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof IEvent2KafkaRecordHandler) {
                return (IEvent2KafkaRecordHandler) newInstance;
            }
            return null;
        } catch (Throwable th) {
            LOG.error("Fail to init IEvent2KafkaRecordHandler,handlerClass:{},error:{}", new Object[]{string, th.getMessage(), th});
            return null;
        }
    }
}
