package org.apache.inlong.common.reporpter;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.time.Instant;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.common.reporpter.dto.StreamConfigLogInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/common/reporpter/StreamConfigLogMetric.class */
public class StreamConfigLogMetric implements Runnable {
    public static final String CONFIG_LOG_REPORT_ENABLE = "report.config.log.enable";
    public static final String CONFIG_LOG_REPORT_SERVER_URL = "report.config.log.server.url";
    public static final String CONFIG_LOG_REPORT_INTERVAL = "report.config.log.interval";
    public static final String CONFIG_LOG_REPORT_CLIENT_VERSION = "report.config.log.client.version";
    public static final String CONFIG_LOG_PULSAR_PRODUCER = "pulsar-producer";
    public static final String CONFIG_LOG_PULSAR_CLIENT = "pulsar-client";
    private StreamConfigLogReporter streamConfigLogReporter;
    private String moduleName;
    private String clientVersion;
    private String localIp;
    private long reportInterval;
    public ConcurrentHashMap<String, StreamConfigLogInfo> dataCacheMap = new ConcurrentHashMap<>();
    public static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) StreamConfigLogMetric.class);
    private static ScheduledExecutorService statExecutor = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder().setNameFormat("StreamConfigLogMetric-Report").setUncaughtExceptionHandler((thread, th) -> {
        LOGGER.error(thread.getName() + " has an uncaught exception: ", th);
    }).build());

    public StreamConfigLogMetric(String str, String str2, long j, String str3, String str4) {
        this.streamConfigLogReporter = new StreamConfigLogReporter(str2);
        this.reportInterval = j;
        this.moduleName = str;
        this.localIp = str3;
        this.clientVersion = str4;
        statExecutor.scheduleWithFixedDelay(this, j, j, TimeUnit.MILLISECONDS);
    }

    public void updateConfigLog(String str, String str2, String str3, ConfigLogTypeEnum configLogTypeEnum, String str4) {
        String str5 = this.moduleName + "-" + str + "-" + str2 + "-" + str3;
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("updateConfigLog key = {}", str5);
        }
        this.dataCacheMap.compute(str5, (str6, streamConfigLogInfo) -> {
            if (streamConfigLogInfo == null) {
                streamConfigLogInfo = new StreamConfigLogInfo();
            }
            updateDataValue(streamConfigLogInfo, str, str2, str3, configLogTypeEnum, str4);
            return streamConfigLogInfo;
        });
    }

    private void updateDataValue(StreamConfigLogInfo streamConfigLogInfo, String str, String str2, String str3, ConfigLogTypeEnum configLogTypeEnum, String str4) {
        streamConfigLogInfo.setComponentName(this.moduleName);
        streamConfigLogInfo.setConfigName(str3);
        streamConfigLogInfo.setInlongGroupId(str);
        streamConfigLogInfo.setInlongStreamId(str2);
        streamConfigLogInfo.setIp(this.localIp);
        streamConfigLogInfo.setVersion(this.clientVersion);
        streamConfigLogInfo.setLogInfo(str4);
        streamConfigLogInfo.setReportTime(Instant.now().toEpochMilli());
        streamConfigLogInfo.setLogType(Integer.valueOf(configLogTypeEnum.getType()));
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            Set<Map.Entry<String, StreamConfigLogInfo>> entrySet = this.dataCacheMap.entrySet();
            long epochMilli = Instant.now().toEpochMilli();
            for (Map.Entry<String, StreamConfigLogInfo> entry : entrySet) {
                StreamConfigLogInfo value = entry.getValue();
                if (epochMilli - value.getReportTime() < this.reportInterval) {
                    if (LOGGER.isDebugEnabled()) {
                        LOGGER.debug("Report metric data config key = {}!", value.getConfigName());
                    }
                    this.streamConfigLogReporter.asyncReportData(value);
                } else {
                    this.dataCacheMap.remove(entry.getKey());
                    LOGGER.info("Remove expired config key {}", entry.getKey());
                }
            }
        } catch (Exception e) {
            LOGGER.error("Report streamConfigLogMetric has exception = {}", (Throwable) e);
        }
    }
}
