package org.apache.inlong.sort.standalone.sink;

import com.google.common.collect.ImmutableMap;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.metric.MetricRegister;
import org.apache.inlong.common.pojo.sort.TaskConfig;
import org.apache.inlong.common.pojo.sort.dataflow.DataFlowConfig;
import org.apache.inlong.common.pojo.sort.dataflow.SourceConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.CsvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.dataType.KvConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.FieldConfig;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.BasicFormatInfo;
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.sdk.transform.decode.SourceDecoder;
import org.apache.inlong.sdk.transform.decode.SourceDecoderFactory;
import org.apache.inlong.sdk.transform.pojo.CsvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.KvSourceInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.holder.v2.SortConfigHolder;
import org.apache.inlong.sort.standalone.metrics.SortMetricItemSet;
import org.apache.inlong.sort.standalone.sink.http.DefaultEvent2HttpRequestHandler;
import org.apache.inlong.sort.standalone.utils.BufferQueue;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/SinkContext.class */
public class SinkContext {
    public static final Logger LOG = InlongLoggerFactory.getLogger(SinkContext.class);
    public static final String KEY_MAX_THREADS = "maxThreads";
    public static final String KEY_PROCESSINTERVAL = "processInterval";
    public static final String KEY_RELOADINTERVAL = "reloadInterval";
    public static final String KEY_TASK_NAME = "taskName";
    public static final String KEY_MAX_BUFFERQUEUE_SIZE_KB = "maxBufferQueueSizeKb";
    public static final int DEFAULT_MAX_BUFFERQUEUE_SIZE_KB = 131072;
    protected final String clusterId;
    protected final String taskName;
    protected final String sinkName;
    protected final Context sinkContext;
    protected TaskConfig taskConfig;

    @Deprecated
    protected SortTaskConfig sortTaskConfig;
    protected final Channel channel;
    protected final int maxThreads;
    protected final long processInterval;
    protected final long reloadInterval;
    protected final boolean unifiedConfiguration = CommonPropertiesHolder.useUnifiedConfiguration();
    protected final SortMetricItemSet metricItemSet;
    protected Timer reloadTimer;

    public SinkContext(String str, Context context, Channel channel) {
        this.sinkName = str;
        this.sinkContext = context;
        this.channel = channel;
        this.clusterId = this.sinkContext.getString("clusterId");
        this.taskName = this.sinkContext.getString("taskName");
        this.maxThreads = this.sinkContext.getInteger(KEY_MAX_THREADS, 10).intValue();
        this.processInterval = this.sinkContext.getInteger(KEY_PROCESSINTERVAL, 100).intValue();
        this.reloadInterval = this.sinkContext.getLong("reloadInterval", 60000L).longValue();
        this.metricItemSet = new SortMetricItemSet(str);
        MetricRegister.register(this.metricItemSet);
    }

    public void start() {
        try {
            reload();
            setReloadTimer();
        } catch (Exception e) {
            LOG.error("failed to start sink context", e);
        }
    }

    public void close() {
        try {
            this.reloadTimer.cancel();
        } catch (Exception e) {
            LOG.error("failed to close sink context", e);
        }
    }

    protected void setReloadTimer() {
        this.reloadTimer = new Timer(true);
        this.reloadTimer.schedule(new TimerTask() { // from class: org.apache.inlong.sort.standalone.sink.SinkContext.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                SinkContext.this.reload();
            }
        }, new Date(System.currentTimeMillis() + this.reloadInterval), this.reloadInterval);
    }

    public void reload() {
        try {
            this.sortTaskConfig = SortClusterConfigHolder.getTaskConfig(this.taskName);
            this.taskConfig = SortConfigHolder.getTaskConfig(this.taskName);
        } catch (Throwable th) {
            LOG.error("failed to stop sink context", th);
        }
    }

    public String getClusterId() {
        return this.clusterId;
    }

    public String getTaskName() {
        return this.taskName;
    }

    public String getSinkName() {
        return this.sinkName;
    }

    public Context getSinkContext() {
        return this.sinkContext;
    }

    public TaskConfig getTaskConfig() {
        return this.taskConfig;
    }

    public SortTaskConfig getSortTaskConfig() {
        return this.sortTaskConfig;
    }

    public boolean isUnifiedConfiguration() {
        return this.unifiedConfiguration;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public int getMaxThreads() {
        return this.maxThreads;
    }

    public long getProcessInterval() {
        return this.processInterval;
    }

    public long getReloadInterval() {
        return this.reloadInterval;
    }

    public SortMetricItemSet getMetricItemSet() {
        return this.metricItemSet;
    }

    public static void fillInlongId(ProfileEvent profileEvent, Map<String, String> map) {
        String inlongGroupId = profileEvent.getInlongGroupId();
        String str = StringUtils.isBlank(inlongGroupId) ? "-" : inlongGroupId;
        String inlongStreamId = profileEvent.getInlongStreamId();
        String str2 = StringUtils.isBlank(inlongStreamId) ? "-" : inlongStreamId;
        map.put(DefaultEvent2HttpRequestHandler.INLONG_GROUP_ID_HEADER, str);
        map.put(DefaultEvent2HttpRequestHandler.INLONG_STREAM_ID_HEADER, str2);
    }

    public static <U> BufferQueue<U> createBufferQueue() {
        return new BufferQueue<>(CommonPropertiesHolder.getInteger("maxBufferQueueSizeKb", 131072).intValue());
    }

    public TransformConfig createTransformConfig(DataFlowConfig dataFlowConfig) {
        return new TransformConfig(dataFlowConfig.getTransformSql(), globalConfiguration());
    }

    public Map<String, Object> globalConfiguration() {
        HashMap hashMap = new HashMap();
        hashMap.putAll(CommonPropertiesHolder.get());
        hashMap.putAll(this.sinkContext.getParameters());
        return ImmutableMap.copyOf(hashMap);
    }

    public SourceDecoder<String> createSourceDecoder(SourceConfig sourceConfig) {
        CsvConfig dataTypeConfig = sourceConfig.getDataTypeConfig();
        List list = (List) sourceConfig.getFieldConfigs().stream().map(this::convertToTransformFieldInfo).collect(Collectors.toList());
        if (dataTypeConfig instanceof CsvConfig) {
            CsvConfig csvConfig = dataTypeConfig;
            return SourceDecoderFactory.createCsvDecoder(CsvSourceInfo.builder().delimiter(csvConfig.getDelimiter()).escapeChar(csvConfig.getEscapeChar()).fields(list).charset(sourceConfig.getEncodingType()).build());
        }
        if (!(dataTypeConfig instanceof KvConfig)) {
            throw new IllegalArgumentException("do not support data type=" + dataTypeConfig.getClass().getName());
        }
        KvConfig kvConfig = (KvConfig) dataTypeConfig;
        return SourceDecoderFactory.createKvDecoder(KvSourceInfo.builder().charset(sourceConfig.getEncodingType()).fields(list).kvDelimiter(kvConfig.getKvSplitter()).entryDelimiter(kvConfig.getEntrySplitter()).lineDelimiter(kvConfig.getLineSeparator()).escapeChar(kvConfig.getEscapeChar()).build());
    }

    public FieldInfo convertToTransformFieldInfo(FieldConfig fieldConfig) {
        return new FieldInfo(fieldConfig.getName(), deriveTypeConverter(fieldConfig.getFormatInfo()));
    }

    public TypeConverter deriveTypeConverter(FormatInfo formatInfo) {
        return formatInfo instanceof BasicFormatInfo ? str -> {
            return ((BasicFormatInfo) formatInfo).deserialize(str);
        } : str2 -> {
            return str2;
        };
    }
}
