package org.apache.inlong.sort.standalone.sink.clickhouse;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Iterator;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.sort.standalone.channel.ProfileEvent;
import org.apache.inlong.sort.standalone.dispatch.DispatchProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sort/standalone/sink/clickhouse/ClickHouseChannelWorker.class */
public class ClickHouseChannelWorker extends Thread {
    public static final Logger LOG = LoggerFactory.getLogger(ClickHouseChannelWorker.class);
    private final ClickHouseSinkContext context;
    private final int workerIndex;
    private LifecycleState status = LifecycleState.IDLE;
    private IEventHandler handler;
    private Connection conn;

    public ClickHouseChannelWorker(ClickHouseSinkContext clickHouseSinkContext, int i) {
        this.context = clickHouseSinkContext;
        this.workerIndex = i;
        this.handler = this.context.createEventHandler();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        this.status = LifecycleState.START;
        LOG.info("start to ClickHouseChannelWorker:{},status:{},index:{}", new Object[]{this.context.getTaskName(), this.status, Integer.valueOf(this.workerIndex)});
        while (this.status == LifecycleState.START) {
            try {
                doRun();
            } catch (Throwable th) {
                LOG.error(th.getMessage(), th);
            }
        }
    }

    public void doRun() {
        DispatchProfile poll = this.context.getDispatchQueue().poll();
        try {
            if (poll == null) {
                sleepOneInterval();
                return;
            }
            ClickHouseIdConfig idConfig = this.context.getIdConfig(poll.getUid());
            if (idConfig == null) {
                this.context.addSendFailMetric("idConfig is null", poll);
                poll.ack();
                return;
            }
            String insertSql = idConfig.getInsertSql();
            if (insertSql == null) {
                this.context.addSendFailMetric("sql is null", poll);
                poll.ack();
                return;
            }
            if (this.conn == null) {
                reconnect();
            }
            try {
                PreparedStatement prepareStatement = this.conn.prepareStatement(insertSql);
                Throwable th = null;
                try {
                    try {
                        Iterator<ProfileEvent> it = poll.getEvents().iterator();
                        while (it.hasNext()) {
                            this.handler.setValue(idConfig, this.handler.parse(idConfig, it.next()), prepareStatement);
                            prepareStatement.addBatch();
                        }
                        prepareStatement.executeBatch();
                        this.conn.commit();
                        if (prepareStatement != null) {
                            if (0 != 0) {
                                try {
                                    prepareStatement.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                prepareStatement.close();
                            }
                        }
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                    }
                } catch (Throwable th4) {
                    if (prepareStatement != null) {
                        if (th != null) {
                            try {
                                prepareStatement.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            prepareStatement.close();
                        }
                    }
                    throw th4;
                }
            } catch (Exception e) {
                reconnect();
            }
        } catch (Throwable th6) {
            LOG.error(th6.getMessage(), th6);
            if (poll != null) {
                this.context.getDispatchQueue().add(poll);
            }
            sleepOneInterval();
        }
    }

    public void close() {
        this.status = LifecycleState.STOP;
    }

    private void sleepOneInterval() {
        try {
            Thread.sleep(this.context.getProcessInterval());
        } catch (InterruptedException e) {
            LOG.error(e.getMessage(), e);
        }
    }

    private void reconnect() throws SQLException {
        if (this.conn != null) {
            try {
                this.conn.close();
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
            }
            this.conn = null;
        }
        this.conn = DriverManager.getConnection(this.context.getJdbcUrl(), this.context.getJdbcUsername(), this.context.getJdbcPassword());
        this.conn.setAutoCommit(false);
    }
}
