package org.apache.inlong.sort.standalone.sink.cls;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.tencentcloudapi.cls.producer.AsyncProducerClient;
import com.tencentcloudapi.cls.producer.AsyncProducerConfig;
import com.tencentcloudapi.cls.producer.errors.ProducerException;
import com.tencentcloudapi.cls.producer.util.NetworkUtils;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/cls/ClsSinkContext.class */
public class ClsSinkContext extends SinkContext {
    private static final Logger LOG = InlongLoggerFactory.getLogger(ClsSinkContext.class);
    private static final String KEY_TOTAL_SIZE_IN_BYTES = "totalSizeInBytes";
    private static final String KEY_MAX_SEND_THREAD_COUNT = "maxSendThreadCount";
    private static final String KEY_MAX_BLOCK_SEC = "maxBlockSec";
    private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
    private static final String KEY_MAX_BATCH_COUNT = "maxBatchCount";
    private static final String KEY_LINGER_MS = "lingerMs";
    private static final String KEY_RETRIES = "retries";
    private static final String KEY_MAX_RESERVED_ATTEMPTS = "maxReservedAttempts";
    private static final String KEY_BASE_RETRY_BACKOFF_MS = "baseRetryBackoffMs";
    private static final String KEY_MAX_RETRY_BACKOFF_MS = "maxRetryBackoffMs";
    private static final String KEY_MAX_KEYWORD_LENGTH = "maxKeywordLength";
    private static final String KEY_EVENT_LOG_ITEM_HANDLER = "logItemHandler";
    public static final String KEY_TOPIC_ID = "topicId";
    private static final int DEFAULT_KEYWORD_MAX_LENGTH = 32767;
    private int keywordMaxLength;
    private final Map<String, AsyncProducerClient> clientMap;
    private List<AsyncProducerClient> deletingClients;
    private Context sinkContext;
    private Map<String, ClsIdConfig> idConfigMap;
    private IEvent2LogItemHandler event2LogItemHandler;

    public ClsSinkContext(String str, Context context, Channel channel) {
        super(str, context, channel);
        this.keywordMaxLength = 32767;
        this.deletingClients = new ArrayList();
        this.idConfigMap = new ConcurrentHashMap();
        this.clientMap = new ConcurrentHashMap();
    }

    @Override // org.apache.inlong.sort.standalone.sink.SinkContext
    public void reload() {
        try {
            this.deletingClients.forEach(asyncProducerClient -> {
                try {
                    asyncProducerClient.close();
                } catch (ProducerException e) {
                    LOG.error("close client failed, got ProducerException" + e.getMessage(), e);
                } catch (InterruptedException e2) {
                    LOG.error("close client failed, got InterruptedException" + e2.getMessage(), e2);
                }
            });
            SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(this.taskName);
            if (this.sortTaskConfig == null || !this.sortTaskConfig.equals(taskConfig)) {
                LOG.info("get new SortTaskConfig:taskName:{}:config:{}", this.taskName, new ObjectMapper().writeValueAsString(taskConfig));
                this.sortTaskConfig = taskConfig;
                this.sinkContext = new Context(this.sortTaskConfig.getSinkParams());
                reloadIdParams();
                reloadClients();
                reloadHandler();
                this.keywordMaxLength = this.sinkContext.getInteger(KEY_MAX_KEYWORD_LENGTH, 32767).intValue();
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private void reloadHandler() {
        String string = CommonPropertiesHolder.getString(KEY_EVENT_LOG_ITEM_HANDLER, DefaultEvent2LogItemHandler.class.getName());
        try {
            Object newInstance = ClassUtils.getClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof IEvent2LogItemHandler) {
                this.event2LogItemHandler = (IEvent2LogItemHandler) newInstance;
            } else {
                LOG.error("{} is not the instance of IEvent2LogItemHandler", string);
            }
        } catch (Throwable th) {
            LOG.error("Fail to init IEvent2LogItemHandler, handlerClass:{}, error:{}", string, th.getMessage());
        }
    }

    private void reloadIdParams() throws JsonProcessingException {
        List<Map> idParams = this.sortTaskConfig.getIdParams();
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        ObjectMapper objectMapper = new ObjectMapper();
        for (Map map : idParams) {
            String generateUid = InlongId.generateUid((String) map.get("inlongGroupId"), (String) map.get("inlongStreamId"));
            ClsIdConfig clsIdConfig = (ClsIdConfig) objectMapper.readValue(objectMapper.writeValueAsString(map), ClsIdConfig.class);
            clsIdConfig.getFieldList();
            concurrentHashMap.put(generateUid, clsIdConfig);
        }
        this.idConfigMap = concurrentHashMap;
    }

    private void reloadClients() {
        Map map = (Map) this.idConfigMap.values().stream().collect(Collectors.toMap((v0) -> {
            return v0.getSecretId();
        }, clsIdConfig -> {
            return clsIdConfig;
        }, (clsIdConfig2, clsIdConfig3) -> {
            return clsIdConfig2;
        }));
        this.clientMap.keySet().stream().filter(str -> {
            return !map.containsKey(str);
        }).forEach(this::removeExpireClient);
        map.values().stream().filter(clsIdConfig4 -> {
            return !this.clientMap.containsKey(clsIdConfig4.getSecretId());
        }).forEach(this::startNewClient);
    }

    private void startNewClient(ClsIdConfig clsIdConfig) {
        AsyncProducerConfig asyncProducerConfig = new AsyncProducerConfig(clsIdConfig.getEndpoint(), clsIdConfig.getSecretId(), clsIdConfig.getSecretKey(), NetworkUtils.getLocalMachineIP());
        setCommonClientConfig(asyncProducerConfig);
        this.clientMap.put(clsIdConfig.getSecretId(), new AsyncProducerClient(asyncProducerConfig));
    }

    private void setCommonClientConfig(AsyncProducerConfig asyncProducerConfig) {
        Optional ofNullable = Optional.ofNullable(this.sinkContext.getInteger(KEY_TOTAL_SIZE_IN_BYTES));
        asyncProducerConfig.getClass();
        ofNullable.ifPresent((v1) -> {
            r1.setTotalSizeInBytes(v1);
        });
        Optional ofNullable2 = Optional.ofNullable(this.sinkContext.getInteger(KEY_MAX_SEND_THREAD_COUNT));
        asyncProducerConfig.getClass();
        ofNullable2.ifPresent((v1) -> {
            r1.setSendThreadCount(v1);
        });
        Optional ofNullable3 = Optional.ofNullable(this.sinkContext.getInteger(KEY_MAX_BLOCK_SEC));
        asyncProducerConfig.getClass();
        ofNullable3.ifPresent((v1) -> {
            r1.setMaxBlockMs(v1);
        });
        Optional ofNullable4 = Optional.ofNullable(this.sinkContext.getInteger(KEY_MAX_BATCH_SIZE));
        asyncProducerConfig.getClass();
        ofNullable4.ifPresent((v1) -> {
            r1.setBatchSizeThresholdInBytes(v1);
        });
        Optional ofNullable5 = Optional.ofNullable(this.sinkContext.getInteger(KEY_MAX_BATCH_COUNT));
        asyncProducerConfig.getClass();
        ofNullable5.ifPresent((v1) -> {
            r1.setBatchCountThreshold(v1);
        });
        Optional ofNullable6 = Optional.ofNullable(this.sinkContext.getInteger(KEY_LINGER_MS));
        asyncProducerConfig.getClass();
        ofNullable6.ifPresent((v1) -> {
            r1.setLingerMs(v1);
        });
        Optional ofNullable7 = Optional.ofNullable(this.sinkContext.getInteger(KEY_RETRIES));
        asyncProducerConfig.getClass();
        ofNullable7.ifPresent((v1) -> {
            r1.setRetries(v1);
        });
        Optional ofNullable8 = Optional.ofNullable(this.sinkContext.getInteger(KEY_MAX_RESERVED_ATTEMPTS));
        asyncProducerConfig.getClass();
        ofNullable8.ifPresent((v1) -> {
            r1.setMaxReservedAttempts(v1);
        });
        Optional ofNullable9 = Optional.ofNullable(this.sinkContext.getInteger(KEY_BASE_RETRY_BACKOFF_MS));
        asyncProducerConfig.getClass();
        ofNullable9.ifPresent((v1) -> {
            r1.setBaseRetryBackoffMs(v1);
        });
        Optional ofNullable10 = Optional.ofNullable(this.sinkContext.getInteger(KEY_MAX_RETRY_BACKOFF_MS));
        asyncProducerConfig.getClass();
        ofNullable10.ifPresent((v1) -> {
            r1.setMaxRetryBackoffMs(v1);
        });
    }

    private void removeExpireClient(String str) {
        if (this.clientMap.get(str) == null) {
            LOG.error("Remove client failed, there is not client of {}", str);
        } else {
            this.deletingClients.add(this.clientMap.remove(str));
        }
    }

    public void addSendResultMetric(ProfileEvent profileEvent, String str, boolean z, long j) {
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(getDimensions(profileEvent, str));
        long length = profileEvent.getBody().length;
        if (!z) {
            findMetricItem.sendFailCount.addAndGet(1L);
            findMetricItem.sendFailSize.addAndGet(length);
            return;
        }
        findMetricItem.sendSuccessCount.addAndGet(1L);
        findMetricItem.sendSuccessSize.addAndGet(length);
        AuditUtils.add(8, profileEvent);
        if (j > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            long j2 = currentTimeMillis - j;
            long j3 = currentTimeMillis - NumberUtils.toLong("sourceTime", profileEvent.getRawLogTime());
            long rawLogTime = currentTimeMillis - profileEvent.getRawLogTime();
            findMetricItem.sinkDuration.addAndGet(j2 * 1);
            findMetricItem.nodeDuration.addAndGet(j3 * 1);
            findMetricItem.wholeDuration.addAndGet(rawLogTime * 1);
        }
    }

    private Map<String, String> getDimensions(ProfileEvent profileEvent, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put(SinkContext.KEY_TASK_NAME, getTaskName());
        fillInlongId(profileEvent, hashMap);
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        long rawLogTime = profileEvent.getRawLogTime();
        hashMap.put("msgTime", String.valueOf(rawLogTime - (rawLogTime % CommonPropertiesHolder.getAuditFormatInterval())));
        return hashMap;
    }

    public ClsIdConfig getIdConfig(String str) {
        return this.idConfigMap.get(str);
    }

    public int getKeywordMaxLength() {
        return this.keywordMaxLength;
    }

    public IEvent2LogItemHandler getLogItemHandler() {
        return this.event2LogItemHandler;
    }

    public AsyncProducerClient getClient(String str) {
        return this.clientMap.get(str);
    }
}
