package org.apache.paimon.flink.action.cdc.mysql;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Stream;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlActionITCaseBase.class */
public class MySqlActionITCaseBase extends ActionITCaseBase {
    private static final Logger LOG = LoggerFactory.getLogger(MySqlActionITCaseBase.class);
    protected static final MySqlContainer MYSQL_CONTAINER = createMySqlContainer(MySqlVersion.V5_7);
    private static final String USER = "paimonuser";
    private static final String PASSWORD = "paimonpw";

    @AfterAll
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        MYSQL_CONTAINER.stop();
        LOG.info("Containers are stopped.");
    }

    private static MySqlContainer createMySqlContainer(MySqlVersion mySqlVersion) {
        return new MySqlContainer(mySqlVersion).withConfigurationOverride("mysql/my.cnf").m64withUsername(USER).m63withPassword(PASSWORD).withEnv("TZ", "America/Los_Angeles").withLogConsumer(new Slf4jLogConsumer(LOG));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void start() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Statement getStatement() throws SQLException {
        return DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(), MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword()).createStatement();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitForResult(List<String> list, FileStoreTable fileStoreTable, RowType rowType, List<String> list2) throws Exception {
        Assertions.assertThat(fileStoreTable.schema().primaryKeys()).isEqualTo(list2);
        while (true) {
            if (rowType.getFieldCount() == fileStoreTable.schema().fields().size()) {
                int i = 0;
                for (int i2 = 0; i2 < fileStoreTable.schema().fields().size(); i2++) {
                    DataField dataField = (DataField) fileStoreTable.schema().fields().get(i2);
                    boolean equals = dataField.name().equals(rowType.getFieldNames().get(i2));
                    boolean equals2 = dataField.type().equals(rowType.getFieldTypes().get(i2));
                    if (equals && equals2) {
                        i++;
                    }
                }
                if (i == rowType.getFieldCount()) {
                    break;
                }
            }
            fileStoreTable = fileStoreTable.copyWithLatestSchema();
            Thread.sleep(1000L);
        }
        ArrayList arrayList = new ArrayList(list);
        Collections.sort(arrayList);
        while (true) {
            ReadBuilder newReadBuilder = fileStoreTable.newReadBuilder();
            TableScan.Plan plan = newReadBuilder.newScan().plan();
            ArrayList arrayList2 = new ArrayList(getResult(newReadBuilder.newRead(), plan == null ? Collections.emptyList() : plan.splits(), rowType));
            Collections.sort(arrayList2);
            if (arrayList.equals(arrayList2)) {
                return;
            } else {
                Thread.sleep(1000L);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getBasicMySqlConfig() {
        HashMap hashMap = new HashMap();
        hashMap.put("hostname", MYSQL_CONTAINER.getHost());
        hashMap.put("port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        hashMap.put("username", USER);
        hashMap.put("password", PASSWORD);
        hashMap.put("server-time-zone", ZoneId.of("America/New_York").toString());
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Map<String, String> getBasicTableConfig() {
        HashMap hashMap = new HashMap();
        ThreadLocalRandom current = ThreadLocalRandom.current();
        hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
        hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
        return hashMap;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public JobClient runActionWithDefaultEnv(MySqlSyncTableAction mySqlSyncTableAction) throws Exception {
        StreamExecutionEnvironment basicEnv = getBasicEnv();
        mySqlSyncTableAction.build(basicEnv);
        JobClient executeAsync = basicEnv.executeAsync();
        waitJobRunning(executeAsync);
        return executeAsync;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runActionWithDefaultEnv(MySqlSyncDatabaseAction mySqlSyncDatabaseAction) throws Exception {
        StreamExecutionEnvironment basicEnv = getBasicEnv();
        mySqlSyncDatabaseAction.build(basicEnv);
        waitJobRunning(basicEnv.executeAsync());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamExecutionEnvironment getBasicEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        return executionEnvironment;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitJobRunning(JobClient jobClient) throws Exception {
        while (((JobStatus) jobClient.getJobStatus().get()) != JobStatus.RUNNING) {
            Thread.sleep(1000L);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public FileStoreTable getFileStoreTable(String str) throws Exception {
        Identifier create = Identifier.create(this.database, str);
        Catalog catalog = catalog();
        Throwable th = null;
        try {
            try {
                FileStoreTable table = catalog.getTable(create);
                if (catalog != null) {
                    if (0 != 0) {
                        try {
                            catalog.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        catalog.close();
                    }
                }
                return table;
            } finally {
            }
        } catch (Throwable th3) {
            if (catalog != null) {
                if (th != null) {
                    try {
                        catalog.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    catalog.close();
                }
            }
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitingTables(String... strArr) throws Exception {
        waitingTables(Arrays.asList(strArr));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void waitingTables(List<String> list) throws Exception {
        LOG.info("Waiting for tables '{}'", list);
        Catalog catalog = catalog();
        Throwable th = null;
        while (!catalog.listTables(this.database).containsAll(list)) {
            try {
                try {
                    Thread.sleep(100L);
                } catch (Throwable th2) {
                    th = th2;
                    throw th2;
                }
            } catch (Throwable th3) {
                if (catalog != null) {
                    if (th != null) {
                        try {
                            catalog.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        catalog.close();
                    }
                }
                throw th3;
            }
        }
        if (catalog != null) {
            if (0 == 0) {
                catalog.close();
                return;
            }
            try {
                catalog.close();
            } catch (Throwable th5) {
                th.addSuppressed(th5);
            }
        }
    }
}
