package org.apache.seatunnel.connectors.seatunnel.jdbc;

import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLClassLoader;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.class */
public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResource {
    private static final Logger log = LoggerFactory.getLogger(AbstractJdbcIT.class);
    protected static final String HOST = "HOST";

    @TestContainerExtension
    protected final ContainerExtendedFactory extendedFactory = genericContainer -> {
        Container.ExecResult execInContainer = genericContainer.execInContainer(new String[]{"bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && wget " + driverUrl()});
        Assertions.assertEquals(0, execInContainer.getExitCode(), execInContainer.getStderr());
    };
    protected GenericContainer<?> dbServer;
    protected JdbcCase jdbcCase;
    protected Connection connection;

    abstract JdbcCase getJdbcCase();

    abstract void compareResult() throws SQLException, IOException;

    abstract String driverUrl();

    abstract Pair<String[], List<SeaTunnelRow>> initTestData();

    abstract GenericContainer<?> initContainer();

    protected void initializeJdbcConnection(String str) throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException {
        URLClassLoader uRLClassLoader = new URLClassLoader(new URL[]{new URL(driverUrl())}, AbstractJdbcIT.class.getClassLoader());
        Thread.currentThread().setContextClassLoader(uRLClassLoader);
        Driver driver = (Driver) uRLClassLoader.loadClass(this.jdbcCase.getDriverClass()).newInstance();
        Properties properties = new Properties();
        if (StringUtils.isNotBlank(this.jdbcCase.getUserName())) {
            properties.put("user", this.jdbcCase.getUserName());
        }
        if (StringUtils.isNotBlank(this.jdbcCase.getPassword())) {
            properties.put("password", this.jdbcCase.getPassword());
        }
        this.connection = driver.connect(str.replace(HOST, this.dbServer.getHost()), properties);
        this.connection.setAutoCommit(false);
    }

    protected void insertTestData() {
        try {
            PreparedStatement prepareStatement = this.connection.prepareStatement(this.jdbcCase.getInsertSql());
            Throwable th = null;
            try {
                for (SeaTunnelRow seaTunnelRow : (List) this.jdbcCase.getTestData().getValue()) {
                    for (int i = 0; i < seaTunnelRow.getArity(); i++) {
                        prepareStatement.setObject(i + 1, seaTunnelRow.getField(i));
                    }
                    prepareStatement.addBatch();
                }
                prepareStatement.executeBatch();
                this.connection.commit();
                if (prepareStatement != null) {
                    if (0 != 0) {
                        try {
                            prepareStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        prepareStatement.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            log.error(ExceptionUtils.getMessage(e));
            throw new SeaTunnelRuntimeException(JdbcITErrorCode.INSERT_DATA_FAILED, e);
        }
    }

    protected void createSchemaIfNeeded() {
    }

    protected void createNeededTables() {
        try {
            Statement createStatement = this.connection.createStatement();
            Throwable th = null;
            try {
                String createSql = this.jdbcCase.getCreateSql();
                String format = String.format(createSql, buildTableInfoWithSchema(this.jdbcCase.getDatabase(), this.jdbcCase.getSourceTable()));
                String format2 = String.format(createSql, buildTableInfoWithSchema(this.jdbcCase.getDatabase(), this.jdbcCase.getSinkTable()));
                createStatement.execute(format);
                createStatement.execute(format2);
                this.connection.commit();
                if (createStatement != null) {
                    if (0 != 0) {
                        try {
                            createStatement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createStatement.close();
                    }
                }
            } finally {
            }
        } catch (Exception e) {
            log.error(ExceptionUtils.getMessage(e));
            throw new SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, e);
        }
    }

    public String insertTable(String str, String str2, String... strArr) {
        return "INSERT INTO " + buildTableInfoWithSchema(str, str2) + " (" + ((String) Arrays.stream(strArr).map(this::quoteIdentifier).collect(Collectors.joining(", "))) + " ) VALUES (" + ((String) Arrays.stream(strArr).map(str3 -> {
            return "?";
        }).collect(Collectors.joining(", "))) + ")";
    }

    public void clearTable(String str, String str2) {
        try {
            Statement createStatement = this.connection.createStatement();
            Throwable th = null;
            try {
                try {
                    createStatement.execute("TRUNCATE TABLE " + buildTableInfoWithSchema(str, str2));
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (SQLException e) {
            throw new SeaTunnelRuntimeException(JdbcITErrorCode.CLEAR_TABLE_FAILED, e);
        }
    }

    public String quoteIdentifier(String str) {
        return "`" + str + "`";
    }

    public String buildTableInfoWithSchema(String str, String str2) {
        return StringUtils.isNotBlank(str) ? quoteIdentifier(str) + "." + quoteIdentifier(str2) : quoteIdentifier(str2);
    }

    @BeforeAll
    public void startUp() {
        this.dbServer = initContainer();
        this.jdbcCase = getJdbcCase();
        Startables.deepStart(Stream.of(this.dbServer)).join();
        Awaitility.given().ignoreExceptions().await().atMost(360L, TimeUnit.SECONDS).untilAsserted(() -> {
            initializeJdbcConnection(this.jdbcCase.getJdbcUrl());
        });
        createSchemaIfNeeded();
        createNeededTables();
        insertTestData();
    }

    public void tearDown() throws SQLException {
        if (this.dbServer != null) {
            this.dbServer.close();
        }
        if (this.connection != null) {
            this.connection.close();
        }
    }

    @TestTemplate
    public void testJdbcDb(TestContainer testContainer) throws IOException, InterruptedException, SQLException {
        Iterator<String> it = this.jdbcCase.getConfigFile().iterator();
        while (it.hasNext()) {
            Container.ExecResult executeJob = testContainer.executeJob(it.next());
            Assertions.assertEquals(0, executeJob.getExitCode(), executeJob.getStderr());
        }
        compareResult();
        clearTable(this.jdbcCase.getDatabase(), this.jdbcCase.getSinkTable());
    }
}
