package org.apache.inlong.sort.standalone.sink.clickhouse;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.ClassUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.inlong.common.pojo.sortstandalone.SortTaskConfig;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.config.holder.CommonPropertiesHolder;
import org.apache.inlong.sort.standalone.config.holder.SortClusterConfigHolder;
import org.apache.inlong.sort.standalone.config.pojo.InlongId;
import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.apache.inlong.sort.standalone.metrics.SortMetricItem;
import org.apache.inlong.sort.standalone.metrics.audit.AuditUtils;
import org.apache.inlong.sort.standalone.sink.SinkContext;
import org.apache.inlong.sort.standalone.utils.InlongLoggerFactory;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseSinkContext.class */
public class ClickHouseSinkContext extends SinkContext {
    public static final Logger LOG = InlongLoggerFactory.getLogger(ClickHouseSinkContext.class);
    public static final String KEY_NODE_ID = "nodeId";
    public static final String KEY_JDBC_DRIVER = "jdbcDriver";
    public static final String DEFAULT_JDBC_DRIVER = "ru.yandex.clickhouse.ClickHouseDriver";
    public static final String KEY_JDBC_URL = "jdbcUrl";
    public static final String KEY_JDBC_USERNAME = "jdbcUsername";
    public static final String KEY_JDBC_PASSWORD = "jdbcPassword";
    public static final String KEY_EVENT_HANDLER = "clickHouseEventHandler";
    private Context parentContext;
    private String nodeId;
    private Map<String, ClickHouseIdConfig> idConfigMap;
    private final LinkedBlockingQueue<DispatchProfile> dispatchQueue;
    private String jdbcDriver;
    private String jdbcUrl;
    private String jdbcUsername;
    private String jdbcPassword;

    public ClickHouseSinkContext(String str, Context context, Channel channel, LinkedBlockingQueue<DispatchProfile> linkedBlockingQueue) {
        super(str, context, channel);
        this.idConfigMap = new ConcurrentHashMap();
        this.parentContext = context;
        this.dispatchQueue = linkedBlockingQueue;
        this.nodeId = CommonPropertiesHolder.getString("nodeId", NetworkUtils.getLocalIp());
    }

    @Override // org.apache.inlong.sort.standalone.sink.SinkContext
    public void reload() {
        try {
            SortTaskConfig taskConfig = SortClusterConfigHolder.getTaskConfig(this.taskName);
            LOG.info("start to get SortTaskConfig:taskName:{}:config:{}", this.taskName, new ObjectMapper().writeValueAsString(taskConfig));
            if (this.sortTaskConfig == null || !this.sortTaskConfig.equals(taskConfig)) {
                ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
                List<Map> idParams = taskConfig.getIdParams();
                ObjectMapper objectMapper = new ObjectMapper();
                for (Map map : idParams) {
                    concurrentHashMap.put(InlongId.generateUid((String) map.get("inlongGroupId"), (String) map.get("inlongStreamId")), (ClickHouseIdConfig) objectMapper.readValue(objectMapper.writeValueAsString(map), ClickHouseIdConfig.class));
                }
                Context context = new Context(this.parentContext.getParameters());
                context.putAll(taskConfig.getSinkParams());
                this.jdbcDriver = context.getString(KEY_JDBC_DRIVER, DEFAULT_JDBC_DRIVER);
                this.jdbcUrl = context.getString(KEY_JDBC_URL);
                this.jdbcUsername = context.getString(KEY_JDBC_USERNAME);
                this.jdbcPassword = context.getString(KEY_JDBC_PASSWORD);
                Class.forName(this.jdbcDriver);
                initIdConfig(concurrentHashMap);
                this.sortTaskConfig = taskConfig;
                this.idConfigMap = concurrentHashMap;
                LOG.info("end to get SortTaskConfig,taskName:{},newIdConfigMap:{},currentContext:{}", new Object[]{this.taskName, new ObjectMapper().writeValueAsString(concurrentHashMap), context});
            }
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    private void initIdConfig(Map<String, ClickHouseIdConfig> map) throws SQLException {
        Connection connection = DriverManager.getConnection(this.jdbcUrl, this.jdbcUsername, this.jdbcPassword);
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    Iterator<Map.Entry<String, ClickHouseIdConfig>> it = map.entrySet().iterator();
                    while (it.hasNext()) {
                        ClickHouseIdConfig value = it.next().getValue();
                        value.setContentFieldList(ClickHouseIdConfig.parseFieldNames(value.getContentFieldNames()));
                        HashMap hashMap = new HashMap();
                        try {
                            ResultSet executeQuery = createStatement.executeQuery("select * from " + value.getTableName());
                            Throwable th3 = null;
                            try {
                                try {
                                    ResultSetMetaData metaData = executeQuery.getMetaData();
                                    int columnCount = metaData.getColumnCount();
                                    for (int i = 1; i <= columnCount; i++) {
                                        hashMap.put(metaData.getColumnName(i), Integer.valueOf(metaData.getColumnType(i)));
                                    }
                                    if (executeQuery != null) {
                                        if (0 != 0) {
                                            try {
                                                executeQuery.close();
                                            } catch (Throwable th4) {
                                                th3.addSuppressed(th4);
                                            }
                                        } else {
                                            executeQuery.close();
                                        }
                                    }
                                } catch (Throwable th5) {
                                    th3 = th5;
                                    throw th5;
                                    break;
                                }
                            } catch (Throwable th6) {
                                if (executeQuery != null) {
                                    if (th3 != null) {
                                        try {
                                            executeQuery.close();
                                        } catch (Throwable th7) {
                                            th3.addSuppressed(th7);
                                        }
                                    } else {
                                        executeQuery.close();
                                    }
                                }
                                throw th6;
                                break;
                            }
                        } catch (Exception e) {
                            LOG.error("Can not get metadata,group:{},stream:{},error:{}", new Object[]{value.getInlongGroupId(), value.getInlongStreamId(), e.getMessage(), e});
                        }
                        List<String> parseFieldNames = ClickHouseIdConfig.parseFieldNames(value.getDbFieldNames());
                        ArrayList arrayList = new ArrayList(parseFieldNames.size());
                        parseFieldNames.forEach(str -> {
                            arrayList.add(new Pair(str, hashMap.getOrDefault(str, 12)));
                        });
                        value.setDbFieldList(arrayList);
                        StringBuilder sb = new StringBuilder();
                        sb.append("insert into ").append(value.getTableName()).append(" (");
                        value.getDbFieldList().forEach(pair -> {
                            sb.append((String) pair.getKey()).append(',');
                        });
                        sb.deleteCharAt(sb.length() - 1);
                        sb.append(") values (");
                        value.getDbFieldList().forEach(pair2 -> {
                            sb.append("?,");
                        });
                        sb.deleteCharAt(sb.length() - 1);
                        sb.append(")");
                        value.setInsertSql(sb.toString());
                    }
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th8) {
                                th2.addSuppressed(th8);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (connection != null) {
                        if (0 == 0) {
                            connection.close();
                            return;
                        }
                        try {
                            connection.close();
                        } catch (Throwable th9) {
                            th.addSuppressed(th9);
                        }
                    }
                } catch (Throwable th10) {
                    th2 = th10;
                    throw th10;
                }
            } catch (Throwable th11) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th12) {
                            th2.addSuppressed(th12);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th11;
            }
        } catch (Throwable th13) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th14) {
                        th.addSuppressed(th14);
                    }
                } else {
                    connection.close();
                }
            }
            throw th13;
        }
    }

    public void addSendMetric(DispatchProfile dispatchProfile) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put(SinkContext.KEY_TASK_NAME, getTaskName());
        hashMap.put("sourceId", "-");
        hashMap.put("sourceDataId", "-");
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", "-");
        hashMap.put("inlongGroupId", dispatchProfile.getInlongGroupId());
        hashMap.put("inlongStreamId", dispatchProfile.getInlongStreamId());
        long dispatchTime = dispatchProfile.getDispatchTime();
        hashMap.put("msgTime", String.valueOf(dispatchTime - (dispatchTime % CommonPropertiesHolder.getAuditFormatInterval())));
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
        findMetricItem.sendCount.addAndGet(dispatchProfile.getCount());
        findMetricItem.sendSize.addAndGet(dispatchProfile.getSize());
    }

    public void addSendFailMetric(String str, DispatchProfile dispatchProfile) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put(SinkContext.KEY_TASK_NAME, getTaskName());
        hashMap.put("sourceId", "-");
        hashMap.put("sourceDataId", "-");
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        hashMap.put("inlongGroupId", dispatchProfile.getInlongGroupId());
        hashMap.put("inlongStreamId", dispatchProfile.getInlongStreamId());
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
        findMetricItem.readFailCount.addAndGet(dispatchProfile.getCount());
        findMetricItem.readFailSize.addAndGet(dispatchProfile.getSize());
    }

    public void addSendFailMetric(String str, ProfileEvent profileEvent) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put(SinkContext.KEY_TASK_NAME, getTaskName());
        hashMap.put("sourceId", "-");
        hashMap.put("sourceDataId", "-");
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        hashMap.put("inlongGroupId", profileEvent.getInlongGroupId());
        hashMap.put("inlongStreamId", profileEvent.getInlongStreamId());
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
        findMetricItem.readFailCount.incrementAndGet();
        findMetricItem.readFailSize.addAndGet(profileEvent.getBody().length);
    }

    public void addSendFailMetric(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put(SinkContext.KEY_TASK_NAME, getTaskName());
        hashMap.put("sourceId", "-");
        hashMap.put("sourceDataId", "-");
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", str);
        hashMap.put("inlongGroupId", "-");
        hashMap.put("inlongStreamId", "-");
        long currentTimeMillis = System.currentTimeMillis();
        hashMap.put("msgTime", String.valueOf(currentTimeMillis - (currentTimeMillis % CommonPropertiesHolder.getAuditFormatInterval())));
        getMetricItemSet().findMetricItem(hashMap).readFailCount.incrementAndGet();
    }

    public void addSendSuccessMetric(DispatchProfile dispatchProfile, long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("clusterId", getClusterId());
        hashMap.put(SinkContext.KEY_TASK_NAME, getTaskName());
        hashMap.put("sourceId", "-");
        hashMap.put("sourceDataId", "-");
        hashMap.put("sinkId", getSinkName());
        hashMap.put("sinkDataId", "-");
        hashMap.put("inlongGroupId", dispatchProfile.getInlongGroupId());
        hashMap.put("inlongStreamId", dispatchProfile.getInlongStreamId());
        long currentTimeMillis = System.currentTimeMillis();
        for (ProfileEvent profileEvent : dispatchProfile.getEvents()) {
            long rawLogTime = profileEvent.getRawLogTime();
            hashMap.put("msgTime", String.valueOf(rawLogTime - (rawLogTime % CommonPropertiesHolder.getAuditFormatInterval())));
            SortMetricItem findMetricItem = getMetricItemSet().findMetricItem(hashMap);
            findMetricItem.sendSuccessCount.incrementAndGet();
            findMetricItem.sendSuccessSize.addAndGet(profileEvent.getBody().length);
            long j2 = currentTimeMillis - j;
            long fetchTime = currentTimeMillis - profileEvent.getFetchTime();
            findMetricItem.sinkDuration.addAndGet(j2);
            findMetricItem.nodeDuration.addAndGet(fetchTime);
            findMetricItem.wholeDuration.addAndGet(currentTimeMillis - rawLogTime);
            AuditUtils.add(8, profileEvent);
        }
    }

    public String getNodeId() {
        return this.nodeId;
    }

    public void setNodeId(String str) {
        this.nodeId = str;
    }

    public String getJdbcDriver() {
        return this.jdbcDriver;
    }

    public void setJdbcDriver(String str) {
        this.jdbcDriver = str;
    }

    public String getJdbcUrl() {
        return this.jdbcUrl;
    }

    public void setJdbcUrl(String str) {
        this.jdbcUrl = str;
    }

    public String getJdbcUsername() {
        return this.jdbcUsername;
    }

    public void setJdbcUsername(String str) {
        this.jdbcUsername = str;
    }

    public String getJdbcPassword() {
        return this.jdbcPassword;
    }

    public void setJdbcPassword(String str) {
        this.jdbcPassword = str;
    }

    public LinkedBlockingQueue<DispatchProfile> getDispatchQueue() {
        return this.dispatchQueue;
    }

    public ClickHouseIdConfig getIdConfig(String str) {
        return this.idConfigMap.get(str);
    }

    public IEventHandler createEventHandler() {
        String string = CommonPropertiesHolder.getString(KEY_EVENT_HANDLER, DefaultEventHandler.class.getName());
        try {
            Object newInstance = ClassUtils.getClass(string).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof IEventHandler) {
                return (IEventHandler) newInstance;
            }
            return null;
        } catch (Throwable th) {
            LOG.error("Fail to init IEventHandler,handlerClass:{},error:{}", new Object[]{string, th.getMessage(), th});
            return null;
        }
    }
}
