package org.apache.inlong.agent.plugin.sources.reader;

import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.utils.AgentDbUtils;
import org.apache.inlong.agent.utils.AgentUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/agent/plugin/sources/reader/SqlReader.class */
public class SqlReader extends org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader {
    private static final String SQL_READER_TAG_NAME = "AgentSqlMetric";
    private static final String JOB_DATABASE_USER = "job.sql.user";
    private static final String JOB_DATABASE_PASSWORD = "job.sql.password";
    private static final String JOB_DATABASE_HOSTNAME = "job.sql.hostname";
    private static final String JOB_DATABASE_PORT = "job.sql.port";
    private static final String JOB_DATABASE_BATCH_SIZE = "job.sql.batchSize";
    private static final int DEFAULT_JOB_DATABASE_BATCH_SIZE = 1000;
    private static final String JOB_DATABASE_DRIVER_CLASS = "job.database.driverClass";
    private static final String DEFAULT_JOB_DATABASE_DRIVER_CLASS = "com.mysql.jdbc.Driver";
    private static final String JOB_DATABASE_TYPE = "job.database.type";
    private static final String MYSQL = "mysql";
    private static final String STD_FIELD_SEPARATOR_SHORT = "\u0001";
    private static final String JOB_DATABASE_SEPARATOR = "job.sql.separator";
    private final String sql;
    private Statement statement;
    private PreparedStatement preparedStatement;
    private Connection conn;
    private ResultSet resultSet;
    private int columnCount;
    private String[] columnTypeNames;
    private int[] columnTypeCodes;
    private boolean finished = false;
    private String separator;
    private static final Logger LOGGER = LoggerFactory.getLogger(SqlReader.class);
    private static final String[] NEW_LINE_CHARS = {String.valueOf('\r'), String.valueOf('\n')};
    private static final String[] EMPTY_CHARS = {"", ""};

    public SqlReader(String str) {
        this.sql = str;
    }

    public Message read() {
        try {
            if (!this.resultSet.next()) {
                this.finished = true;
                return null;
            }
            ArrayList arrayList = new ArrayList();
            for (int i = 1; i <= this.columnCount; i++) {
                int i2 = this.columnTypeCodes[i - 1];
                arrayList.add((i2 == 2004 || i2 == -2 || i2 == -3 || i2 == -4 || this.columnTypeNames[i - 1].contains("BLOB")) ? new String(Base64.encodeBase64(this.resultSet.getBytes(i), false), StandardCharsets.UTF_8) : StringUtils.replaceEachRepeatedly(this.resultSet.getString(i), NEW_LINE_CHARS, EMPTY_CHARS));
            }
            AuditUtils.add(3, this.inlongGroupId, this.inlongStreamId, System.currentTimeMillis(), 1, arrayList.stream().mapToLong(str -> {
                return str.length();
            }).sum());
            this.readerMetric.pluginReadSuccessCount.incrementAndGet();
            this.readerMetric.pluginReadCount.incrementAndGet();
            return generateMessage(arrayList);
        } catch (Exception e) {
            LOGGER.error("error while reading data", e);
            this.readerMetric.pluginReadFailCount.incrementAndGet();
            this.readerMetric.pluginReadCount.incrementAndGet();
            throw new RuntimeException(e);
        }
    }

    private Message generateMessage(List<String> list) {
        return new DefaultMessage(StringUtils.join(list, this.separator).getBytes(StandardCharsets.UTF_8));
    }

    public boolean isFinished() {
        return this.finished;
    }

    public String getReadSource() {
        return this.sql;
    }

    public void setReadTimeout(long j) {
    }

    public void setWaitMillisecond(long j) {
    }

    public String getSnapshot() {
        return "";
    }

    public void finishRead() {
        destroy();
    }

    public boolean isSourceExist() {
        return true;
    }

    private void initColumnMeta() throws Exception {
        this.columnCount = this.resultSet.getMetaData().getColumnCount();
        this.columnTypeNames = new String[this.columnCount];
        this.columnTypeCodes = new int[this.columnCount];
        for (int i = 0; i < this.columnCount; i++) {
            this.columnTypeCodes[i] = this.resultSet.getMetaData().getColumnType(i + 1);
            String columnTypeName = this.resultSet.getMetaData().getColumnTypeName(i + 1);
            if (columnTypeName != null) {
                this.columnTypeNames[i] = columnTypeName.toUpperCase();
            }
        }
    }

    @Override // org.apache.inlong.agent.plugin.sources.reader.file.AbstractReader
    public void init(InstanceProfile instanceProfile) {
        super.init(instanceProfile);
        int i = instanceProfile.getInt(JOB_DATABASE_BATCH_SIZE, 1000);
        String str = instanceProfile.get(JOB_DATABASE_USER);
        String str2 = instanceProfile.get(JOB_DATABASE_PASSWORD);
        String str3 = instanceProfile.get(JOB_DATABASE_HOSTNAME);
        int i2 = instanceProfile.getInt(JOB_DATABASE_PORT);
        String str4 = instanceProfile.get(JOB_DATABASE_DRIVER_CLASS, DEFAULT_JOB_DATABASE_DRIVER_CLASS);
        this.separator = instanceProfile.get(JOB_DATABASE_SEPARATOR, STD_FIELD_SEPARATOR_SHORT);
        this.finished = false;
        try {
            String str5 = instanceProfile.get(JOB_DATABASE_TYPE, MYSQL);
            this.conn = AgentDbUtils.getConnectionFailover(str4, String.format("jdbc:%s://%s:%d", str5, str3, Integer.valueOf(i2)), str, str2);
            if (str5.equals(MYSQL)) {
                this.statement = this.conn.createStatement(1003, 1007);
                this.statement.setFetchSize(Integer.MIN_VALUE);
                this.resultSet = this.statement.executeQuery(this.sql);
            } else {
                this.preparedStatement = this.conn.prepareStatement(this.sql);
                this.preparedStatement.setFetchSize(i);
                this.resultSet = this.preparedStatement.executeQuery();
            }
            initColumnMeta();
        } catch (Exception e) {
            LOGGER.error("error create statement", e);
            destroy();
            throw new RuntimeException(e);
        }
    }

    public void destroy() {
        this.finished = true;
        AgentUtils.finallyClose(this.resultSet);
        AgentUtils.finallyClose(this.statement);
        AgentUtils.finallyClose(this.preparedStatement);
        AgentUtils.finallyClose(this.conn);
    }
}
