package org.apache.inlong.sort.standalone.sink.hive;

import com.alibaba.fastjson.JSON;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.config.pojo.SortTaskConfig;
import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/hive/HiveSinkContext.class */
public class HiveSinkContext extends SinkContext {
    public static final Logger LOG = InlongLoggerFactory.getLogger(HiveSinkContext.class);
    public static final String KEY_NODE_ID = "nodeId";
    public static final String HIVE_DRIVER = "org.apache.hive.jdbc.HiveDriver";
    public static final String KEY_HDFS_PATH = "hdfsPath";
    public static final String KEY_MAX_FILE_OPEN_DELAY = "maxFileOpenDelayMinute";
    public static final long DEFAULT_MAX_FILE_OPEN_DELAY = 5;
    public static final String KEY_EVENT_FORMAT_HANDLER = "eventFormatHandler";
    public static final String KEY_TOKEN_OVERTIME = "tokenOvertimeMinute";
    public static final long DEFAULT_TOKEN_OVERTIME = 60;
    public static final String KEY_MAX_OUTPUT_FILE_SIZE = "maxOutputFileSizeGb";
    public static final long DEFAULT_MAX_OUTPUT_FILE_SIZE = 2;
    public static final long MINUTE_MS = 60000;
    public static final long GB_BYTES = 1073741824;
    public static final long KB_BYTES = 1024;
    public static final String KEY_HIVE_JDBC_URL = "hiveJdbcUrl";
    public static final String KEY_HIVE_DATABASE = "hiveDatabase";
    public static final String KEY_HIVE_USERNAME = "hiveUsername";
    public static final String KEY_HIVE_PASSWORD = "hivePassword";
    private Context parentContext;
    private String nodeId;
    private Map<String, HdfsIdConfig> idConfigMap;
    private LinkedBlockingQueue<DispatchProfile> dispatchQueue;
    private String hdfsPath;
    private long maxFileOpenDelayMinute;
    private long fileArchiveDelayMinute;
    private long tokenOvertimeMinute;
    private long maxOutputFileSizeGb;
    private String hiveJdbcUrl;
    private String hiveDatabase;
    private String hiveUsername;
    private String hivePassword;
    private ExecutorService outputPool;
    private ExecutorService partitionCreatePool;
    private IEventFormatHandler eventFormatHandler;

    public HiveSinkContext(String str, Context context, Channel channel, LinkedBlockingQueue<DispatchProfile> linkedBlockingQueue) {
        super(str, context, channel);
        this.idConfigMap = new ConcurrentHashMap();
        this.dispatchQueue = new LinkedBlockingQueue<>();
        this.maxFileOpenDelayMinute = 5L;
        this.fileArchiveDelayMinute = 2 * this.maxFileOpenDelayMinute;
        this.tokenOvertimeMinute = 60L;
        this.maxOutputFileSizeGb = 2L;
        this.parentContext = context;
        this.dispatchQueue = linkedBlockingQueue;
        this.nodeId = CommonPropertiesHolder.getString("nodeId");
        this.outputPool = Executors.newFixedThreadPool(getMaxThreads());
        this.partitionCreatePool = Executors.newFixedThreadPool(getMaxThreads());
        String string = CommonPropertiesHolder.getString(KEY_EVENT_FORMAT_HANDLER, DefaultEventFormatHandler.class.getName());
        try {
            Object newInstance = ClassUtils.getClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof IEventFormatHandler) {
                this.eventFormatHandler = (IEventFormatHandler) newInstance;
            }
        } catch (Throwable th) {
            LOG.error("Fail to init IEventFormatHandler,handlerClass:{},error:{}", string, th.getMessage());
        }
    }

    @Override // org.apache.inlong.sort.standalone.sink.SinkContext
    public void reload() {
        try {
            SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(this.taskName);
            LOG.info("start to get SortTaskConfig:taskName:{}:config:{}", this.taskName, JSON.toJSONString(taskConfig));
            if (this.sortTaskConfig == null || !this.sortTaskConfig.equals(taskConfig)) {
                this.sortTaskConfig = taskConfig;
                this.parentContext = new Context(this.sortTaskConfig.getSinkParams());
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                for (Map map : this.sortTaskConfig.getIdParams()) {
                    concurrentHashMap.put(InlongId.generateUid((String) map.get("inlongGroupId"), (String) map.get("inlongStreamId")), (HdfsIdConfig) JSON.parseObject(JSON.toJSONString(map), HdfsIdConfig.class));
                }
                this.idConfigMap = concurrentHashMap;
                this.hdfsPath = this.parentContext.getString(KEY_HDFS_PATH);
                this.maxFileOpenDelayMinute = this.parentContext.getLong(KEY_MAX_FILE_OPEN_DELAY, 5L).longValue();
                this.fileArchiveDelayMinute = this.maxFileOpenDelayMinute + 1;
                this.tokenOvertimeMinute = this.parentContext.getLong(KEY_TOKEN_OVERTIME, 60L).longValue();
                this.maxOutputFileSizeGb = this.parentContext.getLong(KEY_MAX_OUTPUT_FILE_SIZE, 2L).longValue();
                this.hiveJdbcUrl = this.parentContext.getString(KEY_HIVE_JDBC_URL);
                this.hiveDatabase = this.parentContext.getString(KEY_HIVE_DATABASE);
                this.hiveUsername = this.parentContext.getString(KEY_HIVE_USERNAME);
                this.hivePassword = this.parentContext.getString(KEY_HIVE_PASSWORD);
                Class.forName(HIVE_DRIVER);
                LOG.info("end to get SortTaskConfig:taskName:{}:newIdConfigMap:{}", this.taskName, JSON.toJSONString(concurrentHashMap));
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    public void addSendMetric(DispatchProfile dispatchProfile, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put("taskName", getTaskName());
        fillInlongId(dispatchProfile, hashMap);
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        long dispatchTime = dispatchProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
        long count = dispatchProfile.getCount();
        long size = dispatchProfile.getSize();
        findMetricItem.sendCount.addAndGet(count);
        findMetricItem.sendSize.addAndGet(size);
    }

    public void addSendFailMetric() {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put("sinkId", getSinkName());
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        getMetricItemSet().findMetricItem(hashMap).readFailCount.incrementAndGet();
    }

    public static void fillInlongId(DispatchProfile dispatchProfile, Map<String, String> map) {
        String inlongGroupId = dispatchProfile.getInlongGroupId();
        String str = StringUtils.isBlank(inlongGroupId) ? "-" : inlongGroupId;
        String inlongStreamId = dispatchProfile.getInlongStreamId();
        String str2 = StringUtils.isBlank(inlongStreamId) ? "-" : inlongStreamId;
        map.put("inlongGroupId", str);
        map.put("inlongStreamId", str2);
    }

    public void addSendResultMetric(DispatchProfile dispatchProfile, String str, boolean z, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put("taskName", getTaskName());
        fillInlongId(dispatchProfile, hashMap);
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        long dispatchTime = dispatchProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
        long count = dispatchProfile.getCount();
        long size = dispatchProfile.getSize();
        if (!z) {
            findMetricItem.sendFailCount.addAndGet(count);
            findMetricItem.sendFailSize.addAndGet(size);
            return;
        }
        findMetricItem.sendSuccessCount.addAndGet(count);
        findMetricItem.sendSuccessSize.addAndGet(size);
        dispatchProfile.getEvents().forEach(profileEvent -> {
            AuditUtils.add(8, profileEvent);
        });
        if (j > 0) {
            long currentTimeMillis = System.currentTimeMillis();
            long j2 = currentTimeMillis - j;
            long j3 = currentTimeMillis - NumberUtils.toLong("sourceTime", dispatchTime);
            findMetricItem.sinkDuration.addAndGet(j2 * count);
            findMetricItem.nodeDuration.addAndGet(j3 * count);
            findMetricItem.wholeDuration.addAndGet((currentTimeMillis - dispatchTime) * count);
        }
    }

    public Connection getHiveConnection() throws SQLException, ClassNotFoundException {
        Class.forName(HIVE_DRIVER);
        String str = this.hiveJdbcUrl + "/" + this.hiveDatabase;
        Connection connection = DriverManager.getConnection(str, this.hiveUsername, this.hivePassword);
        LOG.info("Connect to hive {} successfully", str);
        return connection;
    }

    public String getHiveDatabase() {
        return this.hiveDatabase;
    }

    public Context getProducerContext() {
        return this.parentContext;
    }

    public LinkedBlockingQueue<DispatchProfile> getDispatchQueue() {
        return this.dispatchQueue;
    }

    public HdfsIdConfig getIdConfig(String str) {
        return this.idConfigMap.get(str);
    }

    public String getHdfsPath() {
        return this.hdfsPath;
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public long getMaxFileOpenDelayMinute() {
        return this.maxFileOpenDelayMinute;
    }

    public long getFileArchiveDelayMinute() {
        return this.fileArchiveDelayMinute;
    }

    public long getTokenOvertimeMinute() {
        return this.tokenOvertimeMinute;
    }

    public long getMaxOutputFileSizeGb() {
        return this.maxOutputFileSizeGb;
    }

    public Map<String, HdfsIdConfig> getIdConfigMap() {
        return this.idConfigMap;
    }

    public ExecutorService getOutputPool() {
        return this.outputPool;
    }

    public ExecutorService getPartitionCreatePool() {
        return this.partitionCreatePool;
    }

    public IEventFormatHandler getEventFormatHandler() {
        return this.eventFormatHandler;
    }

    public String getHiveJdbcUrl() {
        return this.hiveJdbcUrl;
    }

    public void setHiveJdbcUrl(String str) {
        this.hiveJdbcUrl = str;
    }

    public String getHiveUsername() {
        return this.hiveUsername;
    }

    public void setHiveUsername(String str) {
        this.hiveUsername = str;
    }

    public String getHivePassword() {
        return this.hivePassword;
    }

    public void setHivePassword(String str) {
        this.hivePassword = str;
    }
}
