package org.apache.paimon.flink.action.cdc.mysql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.Path;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncDatabaseActionITCase.class */
public class MySqlSyncDatabaseActionITCase extends MySqlActionITCaseBase {
    private static final String DATABASE_NAME = "paimon_sync_database";

    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        ThreadLocalRandom current = ThreadLocalRandom.current();
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
        hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
        new MySqlSyncDatabaseAction(basicMySqlConfig, this.warehouse, this.database, false, Collections.emptyMap(), hashMap).build(executionEnvironment);
        waitJobRunning(executionEnvironment.executeAsync());
        Connection connection = DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME), MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword());
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    testSchemaEvolutionImpl(createStatement);
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (connection != null) {
                        if (0 == 0) {
                            connection.close();
                            return;
                        }
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    connection.close();
                }
            }
            throw th8;
        }
    }

    private void testSchemaEvolutionImpl(Statement statement) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable("t1");
        FileStoreTable fileStoreTable2 = getFileStoreTable("t2");
        statement.executeUpdate("USE paimon_sync_database");
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two', 20, 200)");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four', 40, 400)");
        statement.executeUpdate("INSERT INTO t3 VALUES (-1)");
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k", "v1"});
        List<String> singletonList = Collections.singletonList("k");
        waitForResult(Arrays.asList("+I[1, one]", "+I[3, three]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k1", "k2", "v1", "v2"});
        List<String> asList = Arrays.asList("k1", "k2");
        waitForResult(Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"), fileStoreTable2, of2, asList);
        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v3 VARCHAR(10)");
        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 60, 600, 'string_6')");
        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 80, 800, 'string_8')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"k", "v1", "v2"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(10)}, new String[]{"k1", "k2", "v1", "v2", "v3"}), asList);
        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v2 BIGINT");
        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v3 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 100, 1000, 'long_long_string_10')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"k", "v1", "v2"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, 20, 200, NULL]", "+I[4, four, 40, 400, NULL]", "+I[6, six, 60, 600, string_6]", "+I[8, eight, 80, 800, string_8]", "+I[10, ten, 100, 1000, long_long_string_10]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.VARCHAR(20)}, new String[]{"k1", "k2", "v1", "v2", "v3"}), asList);
    }

    @Test
    public void testSpecifiedMySqlTable() {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "my_table");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        MySqlSyncDatabaseAction mySqlSyncDatabaseAction = new MySqlSyncDatabaseAction(basicMySqlConfig, this.warehouse, this.database, false, Collections.emptyMap(), Collections.emptyMap());
        Assertions.assertThat((IllegalArgumentException) org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> {
            mySqlSyncDatabaseAction.build(executionEnvironment);
        }, "Expecting IllegalArgumentException")).hasMessage("table-name cannot be set for mysql-sync-database. If you want to sync several MySQL tables into one Paimon table, use mysql-sync-table instead.");
    }

    @Test
    public void testInvalidDatabase() {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "invalid");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        MySqlSyncDatabaseAction mySqlSyncDatabaseAction = new MySqlSyncDatabaseAction(basicMySqlConfig, this.warehouse, this.database, false, Collections.emptyMap(), Collections.emptyMap());
        Assertions.assertThat((IllegalArgumentException) org.junit.jupiter.api.Assertions.assertThrows(IllegalArgumentException.class, () -> {
            mySqlSyncDatabaseAction.build(executionEnvironment);
        }, "Expecting IllegalArgumentException")).hasMessage("No tables found in MySQL database invalid, or MySQL database does not exist.");
    }

    @Timeout(60)
    @Test
    public void testIgnoreIncompatibleTables() throws Exception {
        Catalog createCatalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(this.warehouse)));
        createCatalog.createDatabase(this.database, true);
        createCatalog.createTable(Identifier.create(this.database, "incompatible"), Schema.newBuilder().column("k", DataTypes.STRING()).column("v1", DataTypes.STRING()).primaryKey(new String[]{"k"}).build(), false);
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_sync_database_ignore_incompatible");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        ThreadLocalRandom current = ThreadLocalRandom.current();
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
        hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
        new MySqlSyncDatabaseAction(basicMySqlConfig, this.warehouse, this.database, true, Collections.emptyMap(), hashMap).build(executionEnvironment);
        waitJobRunning(executionEnvironment.executeAsync());
        Connection connection = DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME), MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword());
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    FileStoreTable fileStoreTable = getFileStoreTable("compatible");
                    createStatement.executeUpdate("USE paimon_sync_database_ignore_incompatible");
                    createStatement.executeUpdate("INSERT INTO compatible VALUES (2, 'two', 20, 200)");
                    createStatement.executeUpdate("INSERT INTO compatible VALUES (4, 'four', 40, 400)");
                    waitForResult(Arrays.asList("+I[2, two, 20, 200]", "+I[4, four, 40, 400]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10).notNull(), DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k1", "k2", "v1", "v2"}), Arrays.asList("k1", "k2"));
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (connection != null) {
                        if (0 == 0) {
                            connection.close();
                            return;
                        }
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    connection.close();
                }
            }
            throw th8;
        }
    }

    @Timeout(60)
    @Test
    public void testTableAffix() throws Exception {
        Catalog createCatalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(this.warehouse)));
        createCatalog.createDatabase(this.database, true);
        createCatalog.createTable(Identifier.create(this.database, "test_prefix_t1_test_suffix"), Schema.newBuilder().column("k1", DataTypes.INT().notNull()).column("v0", DataTypes.VARCHAR(10)).primaryKey(new String[]{"k1"}).build(), false);
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "paimon_sync_database_affix");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        ThreadLocalRandom current = ThreadLocalRandom.current();
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
        hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
        new MySqlSyncDatabaseAction(basicMySqlConfig, this.warehouse, this.database, false, "test_prefix_", "_test_suffix", (String) null, (String) null, Collections.emptyMap(), hashMap).build(executionEnvironment);
        waitJobRunning(executionEnvironment.executeAsync());
        Connection connection = DriverManager.getConnection(MYSQL_CONTAINER.getJdbcUrl(DATABASE_NAME), MYSQL_CONTAINER.getUsername(), MYSQL_CONTAINER.getPassword());
        Throwable th = null;
        try {
            Statement createStatement = connection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    testTableAffixImpl(createStatement);
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (connection != null) {
                        if (0 == 0) {
                            connection.close();
                            return;
                        }
                        try {
                            connection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (connection != null) {
                if (0 != 0) {
                    try {
                        connection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    connection.close();
                }
            }
            throw th8;
        }
    }

    private void testTableAffixImpl(Statement statement) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable("test_prefix_t1_test_suffix");
        FileStoreTable fileStoreTable2 = getFileStoreTable("test_prefix_t2_test_suffix");
        statement.executeUpdate("USE paimon_sync_database_affix");
        statement.executeUpdate("INSERT INTO t1 VALUES (1, 'one')");
        statement.executeUpdate("INSERT INTO t2 VALUES (2, 'two')");
        statement.executeUpdate("INSERT INTO t1 VALUES (3, 'three')");
        statement.executeUpdate("INSERT INTO t2 VALUES (4, 'four')");
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k1", "v0"});
        List<String> singletonList = Collections.singletonList("k1");
        waitForResult(Arrays.asList("+I[1, one]", "+I[3, three]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"k2", "v0"});
        List<String> singletonList2 = Collections.singletonList("k2");
        waitForResult(Arrays.asList("+I[2, two]", "+I[4, four]"), fileStoreTable2, of2, singletonList2);
        statement.executeUpdate("ALTER TABLE t1 ADD COLUMN v1 INT");
        statement.executeUpdate("INSERT INTO t1 VALUES (5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE t2 ADD COLUMN v1 VARCHAR(10)");
        statement.executeUpdate("INSERT INTO t2 VALUES (6, 'six', 's_6')");
        statement.executeUpdate("INSERT INTO t1 VALUES (7, 'seven', 70)");
        statement.executeUpdate("INSERT INTO t2 VALUES (8, 'eight', 's_8')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"k1", "v0", "v1"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(10)}, new String[]{"k2", "v0", "v1"}), singletonList2);
        statement.executeUpdate("ALTER TABLE t1 MODIFY COLUMN v1 BIGINT");
        statement.executeUpdate("INSERT INTO t1 VALUES (9, 'nine', 9000000000000)");
        statement.executeUpdate("ALTER TABLE t2 MODIFY COLUMN v1 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO t2 VALUES (10, 'ten', 'long_s_10')");
        waitForResult(Arrays.asList("+I[1, one, NULL]", "+I[3, three, NULL]", "+I[5, five, 50]", "+I[7, seven, 70]", "+I[9, nine, 9000000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"k1", "v0", "v1"}), singletonList);
        waitForResult(Arrays.asList("+I[2, two, NULL]", "+I[4, four, NULL]", "+I[6, six, s_6]", "+I[8, eight, s_8]", "+I[10, ten, long_s_10]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.VARCHAR(20)}, new String[]{"k2", "v0", "v1"}), singletonList2);
    }

    @Timeout(60)
    @Test
    public void testIncludingTables() throws Exception {
        includingAndExcludingTablesImpl("paimon_sync_database_including", "flink|paimon.+", null, Arrays.asList("flink", "paimon_1", "paimon_2"), Collections.singletonList("ignored"));
    }

    @Timeout(60)
    @Test
    public void testExcludingTables() throws Exception {
        includingAndExcludingTablesImpl("paimon_sync_database_excluding", null, "flink|paimon.+", Collections.singletonList("sync"), Arrays.asList("flink", "paimon_1", "paimon_2"));
    }

    @Timeout(60)
    @Test
    public void testIncludingAndExcludingTables() throws Exception {
        includingAndExcludingTablesImpl("paimon_sync_database_in_excluding", "flink|paimon.+", "paimon_1", Arrays.asList("flink", "paimon_2"), Arrays.asList("paimon_1", "test"));
    }

    private void includingAndExcludingTablesImpl(String str, @Nullable String str2, @Nullable String str3, List<String> list, List<String> list2) throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", str);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(2);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        ThreadLocalRandom current = ThreadLocalRandom.current();
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", String.valueOf(current.nextInt(3) + 1));
        hashMap.put("sink.parallelism", String.valueOf(current.nextInt(3) + 1));
        new MySqlSyncDatabaseAction(basicMySqlConfig, this.warehouse, this.database, false, (String) null, (String) null, str2, str3, Collections.emptyMap(), hashMap).build(executionEnvironment);
        waitJobRunning(executionEnvironment.executeAsync());
        assertTableExists(list);
        assertTableNotExists(list2);
    }

    private FileStoreTable getFileStoreTable(String str) throws Exception {
        return CatalogFactory.createCatalog(CatalogContext.create(new Path(this.warehouse))).getTable(Identifier.create(this.database, str));
    }

    private void assertTableExists(List<String> list) {
        Catalog createCatalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(this.warehouse)));
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(createCatalog.tableExists(Identifier.create(this.database, it.next()))).isTrue();
        }
    }

    private void assertTableNotExists(List<String> list) {
        Catalog createCatalog = CatalogFactory.createCatalog(CatalogContext.create(new Path(this.warehouse)));
        Iterator<String> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(createCatalog.tableExists(Identifier.create(this.database, it.next()))).isFalse();
        }
    }
}
