package org.apache.paimon.flink.action.cdc.mysql;

import java.sql.SQLException;
import java.sql.Statement;
import java.time.DayOfWeek;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.WeekFields;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.testutils.assertj.PaimonAssertions;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.JsonSerdeUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mysql/MySqlSyncTableActionITCase.class */
public class MySqlSyncTableActionITCase extends MySqlActionITCaseBase {
    private static final String DATABASE_NAME = "paimon_sync_table";

    @BeforeAll
    public static void startContainers() {
        MYSQL_CONTAINER.withSetupSQL("mysql/sync_table_setup.sql");
        start();
    }

    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "schema_evolution_\\d+");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withCatalogConfig(Collections.singletonMap(CatalogOptions.METASTORE.key(), "test-alter-table")).withTableConfig(getBasicTableConfig()).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build());
        checkTableSchema("[{\"id\":0,\"name\":\"pt\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"_id\"},{\"id\":2,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"}]");
        Statement statement = getStatement();
        Throwable th = null;
        try {
            testSchemaEvolutionImpl(statement);
            if (statement != null) {
                if (0 == 0) {
                    statement.close();
                    return;
                }
                try {
                    statement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    private void checkTableSchema(String str) throws Exception {
        Assertions.assertThat(JsonSerdeUtil.toFlatJson(getFileStoreTable().schema().fields())).isEqualTo(str);
    }

    private void testSchemaEvolutionImpl(Statement statement) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable();
        statement.executeUpdate("USE paimon_sync_table");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 1, 'one')");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 2, 'two'), (2, 4, 'four')");
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10)}, new String[]{"pt", "_id", "v1"});
        List<String> asList = Arrays.asList("pt", "_id");
        waitForResult(Arrays.asList("+I[1, 1, one]", "+I[1, 2, two]", "+I[2, 4, four]"), fileStoreTable, of, asList);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (2, 3, 'three', 30), (1, 5, 'five', 50)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v2 INT");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (1, 6, 'six', 60)");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'second' WHERE _id = 2");
        waitForResult(Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30]", "+I[2, 4, four, NULL]", "+I[1, 5, five, 50]", "+I[1, 6, six, 60]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT()}, new String[]{"pt", "_id", "v1", "v2"}), asList);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v2 BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (2, 7, 'seven', 70000000000)");
        statement.executeUpdate("DELETE FROM schema_evolution_1 WHERE _id = 5");
        statement.executeUpdate("UPDATE schema_evolution_1 SET v2 = 30000000000 WHERE _id = 3");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v2 BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_2 VALUES (2, 8, 'eight', 80000000000)");
        waitForResult(Arrays.asList("+I[1, 1, one, NULL]", "+I[1, 2, second, NULL]", "+I[2, 3, three, 30000000000]", "+I[2, 4, four, NULL]", "+I[1, 6, six, 60]", "+I[2, 7, seven, 70000000000]", "+I[2, 8, eight, 80000000000]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.BIGINT()}, new String[]{"pt", "_id", "v1", "v2"}), asList);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v3 NUMERIC(8, 3)");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v4 VARBINARY(10)");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 ADD COLUMN v5 FLOAT");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v1 VARCHAR(20)");
        statement.executeUpdate("INSERT INTO schema_evolution_1 VALUES (1, 9, 'nine', 90000000000, 99999.999, 'nine.bin', 9.9)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v3 NUMERIC(8, 3)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v4 VARBINARY(10)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 ADD COLUMN v5 FLOAT");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v1 VARCHAR(20)");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v1 = 'very long string' WHERE _id = 8");
        waitForResult(Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, NULL, NULL]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110], 9.9]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.VARBINARY(10), DataTypes.FLOAT()}, new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"}), asList);
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v4 VARBINARY(20)");
        statement.executeUpdate("ALTER TABLE schema_evolution_1 MODIFY COLUMN v5 DOUBLE");
        statement.executeUpdate("UPDATE schema_evolution_1 SET v4 = 'nine.bin.long', v5 = 9.00000000009 WHERE _id = 9");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v4 VARBINARY(20)");
        statement.executeUpdate("ALTER TABLE schema_evolution_2 MODIFY COLUMN v5 DOUBLE");
        statement.executeUpdate("UPDATE schema_evolution_2 SET v4 = 'four.bin.long', v5 = 4.00000000004 WHERE _id = 4");
        waitForResult(Arrays.asList("+I[1, 1, one, NULL, NULL, NULL, NULL]", "+I[1, 2, second, NULL, NULL, NULL, NULL]", "+I[2, 3, three, 30000000000, NULL, NULL, NULL]", "+I[2, 4, four, NULL, NULL, [102, 111, 117, 114, 46, 98, 105, 110, 46, 108, 111, 110, 103], 4.00000000004]", "+I[1, 6, six, 60, NULL, NULL, NULL]", "+I[2, 7, seven, 70000000000, NULL, NULL, NULL]", "+I[2, 8, very long string, 80000000000, NULL, NULL, NULL]", "+I[1, 9, nine, 90000000000, 99999.999, [110, 105, 110, 101, 46, 98, 105, 110, 46, 108, 111, 110, 103], 9.00000000009]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.DECIMAL(8, 3), DataTypes.VARBINARY(20), DataTypes.DOUBLE()}, new String[]{"pt", "_id", "v1", "v2", "v3", "v4", "v5"}), asList);
        Assertions.assertThat(getFileStoreTable().options()).containsEntry("alter-table-test", "true");
    }

    @Timeout(60)
    @Test
    public void testMultipleSchemaEvolutions() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "schema_evolution_multiple");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).build());
        checkTableSchema("[{\"id\":0,\"name\":\"_id\",\"type\":\"INT NOT NULL\",\"description\":\"primary\"},{\"id\":1,\"name\":\"v1\",\"type\":\"VARCHAR(10)\",\"description\":\"v1\"},{\"id\":2,\"name\":\"v2\",\"type\":\"INT\",\"description\":\"v2\"},{\"id\":3,\"name\":\"v3\",\"type\":\"VARCHAR(10)\",\"description\":\"v3\"}]");
        Statement statement = getStatement();
        Throwable th = null;
        try {
            testSchemaEvolutionMultipleImpl(statement);
            if (statement != null) {
                if (0 == 0) {
                    statement.close();
                    return;
                }
                try {
                    statement.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    private void testSchemaEvolutionMultipleImpl(Statement statement) throws Exception {
        FileStoreTable fileStoreTable = getFileStoreTable();
        statement.executeUpdate("USE paimon_sync_table");
        statement.executeUpdate("INSERT INTO schema_evolution_multiple VALUES (1, 'one', 10, 'string_1')");
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.INT(), DataTypes.VARCHAR(10)}, new String[]{"_id", "v1", "v2", "v3"});
        List<String> singletonList = Collections.singletonList("_id");
        waitForResult(Collections.singletonList("+I[1, one, 10, string_1]"), fileStoreTable, of, singletonList);
        statement.executeUpdate("ALTER TABLE schema_evolution_multiple ADD v4 INT, MODIFY COLUMN v1 VARCHAR(20), ADD COLUMN (v5 DOUBLE, v6 DECIMAL(5, 3), `$% ^,& *(` VARCHAR(10) COMMENT 'Hi, v700 DOUBLE \\', v701 INT a test'), MODIFY v2 BIGINT");
        statement.executeUpdate("INSERT INTO schema_evolution_multiple VALUES (2, 'long_string_two', 2000000000000, 'string_2', 20, 20.5, 20.002, 'test_2')");
        waitForResult(Arrays.asList("+I[1, one, 10, string_1, NULL, NULL, NULL, NULL]", "+I[2, long_string_two, 2000000000000, string_2, 20, 20.5, 20.002, test_2]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(20), DataTypes.BIGINT(), DataTypes.VARCHAR(10), DataTypes.INT(), DataTypes.DOUBLE(), DataTypes.DECIMAL(5, 3), DataTypes.VARCHAR(10)}, new String[]{"_id", "v1", "v2", "v3", "v4", "v5", "v6", "$% ^,& *("}), singletonList);
    }

    @Timeout(90)
    @Test
    public void testAllTypes() throws Exception {
        for (int i = 0; i < 2; i++) {
            testAllTypesOnce();
        }
    }

    private void testAllTypesOnce() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "all_types_table");
        JobClient runActionWithDefaultEnv = runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pt", "_id").build());
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                testAllTypesImpl(statement);
                if (statement != null) {
                    if (0 != 0) {
                        try {
                            statement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        statement.close();
                    }
                }
                runActionWithDefaultEnv.cancel().get();
            } finally {
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    private void testAllTypesImpl(Statement statement) throws Exception {
        RowType of = RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.DECIMAL(2, 1).notNull(), DataTypes.BOOLEAN(), DataTypes.BINARY(8), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.BOOLEAN(), DataTypes.TINYINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.SMALLINT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.DECIMAL(20, 0), DataTypes.DECIMAL(20, 0), DataTypes.DECIMAL(20, 0).notNull(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DOUBLE(), DataTypes.DECIMAL(8, 3), DataTypes.DECIMAL(8, 3), DataTypes.DECIMAL(8, 3), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(8, 0), DataTypes.DECIMAL(38, 10), DataTypes.DATE(), DataTypes.TIMESTAMP(0), DataTypes.TIMESTAMP(3), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(0), DataTypes.TIMESTAMP(2), DataTypes.TIMESTAMP(6), DataTypes.TIMESTAMP(0), DataTypes.CHAR(10), DataTypes.VARCHAR(20), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.VARBINARY(10), DataTypes.VARBINARY(20), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.BYTES(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT(), DataTypes.TIME(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.ARRAY(DataTypes.STRING())}, new String[]{"_id", "pt", "_bit1", "_bit", "_tinyint1", "_boolean", "_bool", "_tinyint", "_tinyint_unsigned", "_tinyint_unsigned_zerofill", "_smallint", "_smallint_unsigned", "_smallint_unsigned_zerofill", "_mediumint", "_mediumint_unsigned", "_mediumint_unsigned_zerofill", "_int", "_int_unsigned", "_int_unsigned_zerofill", "_bigint", "_bigint_unsigned", "_bigint_unsigned_zerofill", "_serial", "_float", "_float_unsigned", "_float_unsigned_zerofill", "_real", "_real_unsigned", "_real_unsigned_zerofill", "_double", "_double_unsigned", "_double_unsigned_zerofill", "_double_precision", "_double_precision_unsigned", "_double_precision_unsigned_zerofill", "_numeric", "_numeric_unsigned", "_numeric_unsigned_zerofill", "_fixed", "_fixed_unsigned", "_fixed_unsigned_zerofill", "_decimal", "_decimal_unsigned", "_decimal_unsigned_zerofill", "_big_decimal", "_date", "_datetime", "_datetime3", "_datetime6", "_datetime_p", "_datetime_p2", "_timestamp", "_timestamp0", "_char", "_varchar", "_tinytext", "_text", "_mediumtext", "_longtext", "_bin", "_varbin", "_tinyblob", "_blob", "_mediumblob", "_longblob", "_json", "_enum", "_year", "_time", "_point", "_geometry", "_linestring", "_polygon", "_multipoint", "_multiline", "_multipolygon", "_geometrycollection", "_set"});
        FileStoreTable fileStoreTable = getFileStoreTable();
        List<String> asList = Arrays.asList("+I[1, 1.1, " + String.format("true, %s, ", Arrays.toString(new byte[]{0, 0, 0, 0, 0, 0, 7, -57})) + "true, true, false, 1, 2, 3, 1000, 2000, 3000, 100000, 200000, 300000, 1000000, 2000000, 3000000, 10000000000, 20000000000, 30000000000, 40000000000, 1.5, 2.5, 3.5, 1.000001, 2.000002, 3.000003, 1.000011, 2.000022, 3.000033, 1.000111, 2.000222, 3.000333, 12345.110, 12345.220, 12345.330, 123456789876543212345678987654321.11, 123456789876543212345678987654321.22, 123456789876543212345678987654321.33, 11111, 22222, 33333, 2222222222222222300000001111.1234567890, 19439, 2023-03-23T14:30:05, 2023-03-23T14:30:05.123, 2023-03-23T14:30:05.123456, 2023-03-24T14:30, 2023-03-24T14:30:05.120, 2023-03-23T18:00:10.123456, 2023-03-23T03:10, Paimon, Apache Paimon, Apache Paimon MySQL TINYTEXT Test Data, Apache Paimon MySQL Test Data, Apache Paimon MySQL MEDIUMTEXT Test Data, Apache Paimon MySQL Long Test Data, [98, 121, 116, 101, 115, 0, 0, 0, 0, 0], [109, 111, 114, 101, 32, 98, 121, 116, 101, 115], [84, 73, 78, 89, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [77, 69, 68, 73, 85, 77, 66, 76, 79, 66, 32, 116, 121, 112, 101, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], [76, 79, 78, 71, 66, 76, 79, 66, 32, 32, 98, 121, 116, 101, 115, 32, 116, 101, 115, 116, 32, 100, 97, 116, 97], {\"a\": \"b\"}, value1, 2023, 36803000, {\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[3,0],[3,3],[3,5]],\"type\":\"LineString\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,1],[2,2],[1,2],[1,1]]],\"type\":\"Polygon\",\"srid\":0}, {\"coordinates\":[[1,1],[2,2]],\"type\":\"MultiPoint\",\"srid\":0}, {\"coordinates\":[[[1,1],[2,2],[3,3]],[[4,4],[5,5]]],\"type\":\"MultiLineString\",\"srid\":0}, {\"coordinates\":[[[[0,0],[10,0],[10,10],[0,10],[0,0]]],[[[5,5],[7,5],[7,7],[5,7],[5,5]]]],\"type\":\"MultiPolygon\",\"srid\":0}, {\"geometries\":[{\"type\":\"Point\",\"coordinates\":[10,10]},{\"type\":\"Point\",\"coordinates\":[30,30]},{\"type\":\"LineString\",\"coordinates\":[[15,15],[20,20]]}],\"type\":\"GeometryCollection\",\"srid\":0}, [a, b]]", "+I[2, 2.2, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, 50000000000, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL]");
        waitForResult(asList, fileStoreTable, of, Arrays.asList("pt", "_id"));
        try {
            statement.executeUpdate("USE paimon_sync_table");
            statement.executeUpdate("ALTER TABLE all_types_table ADD COLUMN v INT");
            ArrayList arrayList = new ArrayList(of.getFields());
            arrayList.add(new DataField(of.getFieldCount(), "v", DataTypes.INT()));
            waitForResult((List) asList.stream().map(str -> {
                return str.substring(0, str.length() - 1) + ", NULL]";
            }).collect(Collectors.toList()), fileStoreTable, new RowType(arrayList), Arrays.asList("pt", "_id"));
            statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN v");
            new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location()).commitChanges(new SchemaChange[]{SchemaChange.dropColumn("v")});
        } catch (Throwable th) {
            statement.executeUpdate("ALTER TABLE all_types_table DROP COLUMN v");
            new SchemaManager(fileStoreTable.fileIO(), fileStoreTable.location()).commitChanges(new SchemaChange[]{SchemaChange.dropColumn("v")});
            throw th;
        }
    }

    @Test
    public void testIncompatibleMySqlTable() {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "incompatible_field_\\d+");
        MySqlSyncTableAction build = syncTableActionBuilder(basicMySqlConfig).build();
        build.getClass();
        Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, "Column v1 have different types when merging schemas.\nCurrent table '{paimon_sync_table.incompatible_field_1}' field: `v1` TIMESTAMP(0) ''\nTo be merged table 'paimon_sync_table.incompatible_field_2' field: `v1` INT ''")});
    }

    @Test
    public void testIncompatiblePaimonTable() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "incompatible_pk_\\d+");
        createFileStoreTable(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.BIGINT(), DataTypes.DOUBLE()}, new String[]{"a", "b", "c"}), Collections.emptyList(), Collections.singletonList("a"), new HashMap());
        MySqlSyncTableAction build = syncTableActionBuilder(basicMySqlConfig).withPrimaryKeys("a").build();
        build.getClass();
        Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, "Paimon schema and source table schema are not compatible.")});
    }

    @Test
    public void testInvalidPrimaryKey() {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "schema_evolution_\\d+");
        MySqlSyncTableAction build = syncTableActionBuilder(basicMySqlConfig).withPrimaryKeys("pk").build();
        build.getClass();
        Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, "Specified primary key 'pk' does not exist in source tables or computed columns [pt, _id, v1].")});
    }

    @Test
    public void testNoPrimaryKey() {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "incompatible_pk_\\d+");
        MySqlSyncTableAction build = syncTableActionBuilder(basicMySqlConfig).build();
        build.getClass();
        Assertions.assertThatThrownBy(build::run).satisfies(new ThrowingConsumer[]{PaimonAssertions.anyCauseMatches(IllegalArgumentException.class, "Primary keys are not specified. Also, can't infer primary keys from source table schemas because source tables have no primary keys or have different primary keys.")});
    }

    @Timeout(60)
    @Test
    public void testComputedColumn() throws Exception {
        int i = 0;
        while (i < 2) {
            innerTestComputedColumn(i == 0);
            i++;
        }
    }

    private void innerTestComputedColumn(boolean z) throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "test_computed_column");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPartitionKeys("_year_date").withPrimaryKeys("pk", "_year_date").withComputedColumnArgs(Arrays.asList("_year_date=year(_date)", "_year_datetime=year(_datetime)", "_year_timestamp=year(_timestamp)", "_month_date=month(_date)", "_month_datetime=month(_datetime)", "_month_timestamp=month(_timestamp)", "_day_date=day(_date)", "_day_datetime=day(_datetime)", "_day_timestamp=day(_timestamp)", "_hour_date=hour(_date)", "_hour_datetime=hour(_datetime)", "_hour_timestamp=hour(_timestamp)", "_minute_date=minute(_date)", "_minute_datetime=minute(_datetime)", "_minute_timestamp=minute(_timestamp)", "_second_date=second(_date)", "_second_datetime=second(_datetime)", "_second_timestamp=second(_timestamp)", "_date_format_date=date_format(_date,yyyy)", "_date_format_datetime=date_format(_datetime,yyyy-MM-dd)", "_date_format_timestamp=date_format(_timestamp,yyyyMMdd)", "_substring_date1=substring(_date,2)", "_substring_date2=substring(_timestamp,5,10)", "_truncate_date=trUNcate(pk,2)", "_constant=cast(11,INT)")).build());
        if (z) {
            Statement statement = getStatement();
            Throwable th = null;
            try {
                try {
                    statement.execute("USE paimon_sync_table");
                    statement.executeUpdate("INSERT INTO test_computed_column VALUES (1, '2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
                    statement.executeUpdate("INSERT INTO test_computed_column VALUES (2, '2023-03-23', null, null)");
                    if (statement != null) {
                        if (0 != 0) {
                            try {
                                statement.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            statement.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (statement != null) {
                    if (th != null) {
                        try {
                            statement.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        statement.close();
                    }
                }
                throw th3;
            }
        }
        waitForResult(Arrays.asList("+I[1, 19439, 2022-01-01T14:30, 2021-09-15T15:00:10, 2023, 2022, 2021, 3, 1, 9, 23, 1, 15, 0, 14, 15, 0, 30, 0, 0, 0, 10, 2023, 2022-01-01, 20210915, 23-03-23, 09-15, 0, 11]", "+I[2, 19439, NULL, NULL, 2023, NULL, NULL, 3, NULL, NULL, 23, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 0, NULL, NULL, 2023, NULL, NULL, 23-03-23, NULL, 2, 11]"), getFileStoreTable(), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP(0), DataTypes.TIMESTAMP(0), DataTypes.INT().notNull(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.INT().notNull(), DataTypes.INT()}, new String[]{"pk", "_date", "_datetime", "_timestamp", "_year_date", "_year_datetime", "_year_timestamp", "_month_date", "_month_datetime", "_month_timestamp", "_day_date", "_day_datetime", "_day_timestamp", "_hour_date", "_hour_datetime", "_hour_timestamp", "_minute_date", "_minute_datetime", "_minute_timestamp", "_second_date", "_second_datetime", "_second_timestamp", "_date_format_date", "_date_format_datetime", "_date_format_timestamp", "_substring_date1", "_substring_date2", "_truncate_date", "_constant"}), Arrays.asList("pk", "_year_date"));
    }

    @Timeout(60)
    @Test
    public void testTemporalToIntWithEpochTime() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "test_time_to_int_epoch");
        ThreadLocalRandom current = ThreadLocalRandom.current();
        int nextInt = current.nextInt(5);
        String str = (String) Arrays.asList("_second_val0", "_second_val1", "_millis_val", "_micros_val", "_nanos_val").get(nextInt);
        String str2 = (String) Arrays.asList("", ",0", ",3", ",6", ",9").get(nextInt);
        int nextInt2 = current.nextInt(6);
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withComputedColumnArgs(String.format("_time_to_int=%s(%s%s)", (String) Arrays.asList("year", "month", "day", "hour", "minute", "second").get(nextInt2), str, str2)).build());
        Statement statement = getStatement();
        Throwable th = null;
        try {
            statement.execute("USE paimon_sync_table");
            insertEpochTime("test_time_to_int_epoch", 1, "2024-01-01T00:01:02.123456789Z", statement);
            insertEpochTime("test_time_to_int_epoch", 2, "2024-12-31T12:59:59.123456789Z", statement);
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    statement.close();
                }
            }
            waitForResult(Arrays.asList("+I[1, 1704067262, 1704067262, 1704067262123, 1704067262123456, 1704067262123456789, " + ((Integer) Arrays.asList(2024, 1, 1, 0, 1, 2).get(nextInt2)).intValue() + "]", "+I[2, 1735649999, 1735649999, 1735649999123, 1735649999123456, 1735649999123456789, " + ((Integer) Arrays.asList(2024, 12, 31, 12, 59, 59).get(nextInt2)).intValue() + "]"), getFileStoreTable(), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.INT()}, new String[]{"pk", "_second_val0", "_second_val1", "_millis_val", "_micros_val", "_nanos_val", "_time_to_int"}), Collections.singletonList("pk"));
        } catch (Throwable th3) {
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    @Timeout(60)
    @Test
    public void testDateFormatWithEpochTime() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "test_date_format_epoch");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withComputedColumnArgs(Arrays.asList("_from_second0_default=date_format(_second_val0, yyyy-MM-dd HH:mm:ss)", "_from_second0=date_format(_second_val0, yyyy-MM-dd HH:mm:ss, 0)", "_from_second1=date_format(_second_val1, yyyy-MM-dd HH:mm:ss, 0)", "_from_second1_week=date_format(_second_val1, yyyy-ww, 0)", "_from_millisecond=date_format(_millis_val, yyyy-MM-dd HH:mm:ss.SSS, 3)", "_from_microsecond=date_format(_micros_val, yyyy-MM-dd HH:mm:ss.SSSSSS, 6)", "_from_nanoseconds=date_format(_nanos_val, yyyy-MM-dd HH:mm:ss.SSSSSSSSS, 9)")).build());
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                statement.execute("USE paimon_sync_table");
                insertEpochTime("test_date_format_epoch", 1, "2024-01-07T00:01:02.123456789Z", statement);
                if (statement != null) {
                    if (0 != 0) {
                        try {
                            statement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        statement.close();
                    }
                }
                waitForResult(Collections.singletonList("+I[1, 1704585662, 1704585662, 1704585662123, 1704585662123456, 1704585662123456789, 2024-01-07 00:01:02, 2024-01-07 00:01:02, 2024-01-07 00:01:02, " + String.format("2024-0%s, ", Integer.valueOf(WeekFields.of(Locale.getDefault()).getFirstDayOfWeek() == DayOfWeek.MONDAY ? 1 : 2)) + "2024-01-07 00:01:02.123, 2024-01-07 00:01:02.123456, 2024-01-07 00:01:02.123456789]"), getFileStoreTable(), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.INT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.BIGINT(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"pk", "_second_val0", "_second_val1", "_millis_val", "_micros_val", "_nanos_val", "_from_second0_default", "_from_second0", "_from_second1", "_from_second1_week", "_from_millisecond", "_from_microsecond", "_from_nanoseconds"}), Collections.singletonList("pk"));
            } finally {
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    private void insertEpochTime(String str, int i, String str2, Statement statement) throws SQLException {
        Instant parse = Instant.parse(str2);
        long epochSecond = parse.getEpochSecond();
        statement.executeUpdate(String.format("INSERT INTO %s VALUES (%d, %d, %d, %d, %d, %d)", str, Integer.valueOf(i), Long.valueOf(epochSecond), Long.valueOf(epochSecond), Long.valueOf((epochSecond * 1000) + (r0 / 1000000)), Long.valueOf((epochSecond * 1000000) + (r0 / 1000)), Long.valueOf((epochSecond * 1000000000) + parse.getNano())));
    }

    @Timeout(60)
    @Test
    public void testSyncShards() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        ThreadLocalRandom current = ThreadLocalRandom.current();
        String str = current.nextBoolean() ? "shard_.+" : "shard_1|shard_2";
        String str2 = current.nextBoolean() ? "t.+" : "t1|t2";
        basicMySqlConfig.put("database-name", str);
        basicMySqlConfig.put("table-name", str2);
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").build());
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                statement.execute("USE shard_1");
                statement.executeUpdate("INSERT INTO t1 VALUES (1, '2023-07-30')");
                statement.executeUpdate("INSERT INTO t2 VALUES (2, '2023-07-30')");
                statement.execute("USE shard_2");
                statement.executeUpdate("INSERT INTO t1 VALUES (3, '2023-07-31')");
                statement.executeUpdate("INSERT INTO t1 VALUES (4, '2023-07-31')");
                if (statement != null) {
                    if (0 != 0) {
                        try {
                            statement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        statement.close();
                    }
                }
                waitForResult(Arrays.asList("+I[1, 2023-07-30, 07-30]", "+I[2, 2023-07-30, 07-30]", "+I[3, 2023-07-31, 07-31]", "+I[4, 2023-07-31, 07-31]"), getFileStoreTable(), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.STRING().notNull()}, new String[]{"pk", "_date", "pt"}), Arrays.asList("pk", "pt"));
            } finally {
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    @Timeout(60)
    @Test
    public void testOptionsChange() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "test_options_change");
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", "1");
        hashMap.put("sink.parallelism", "1");
        JobClient runActionWithDefaultEnv = runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").withTableConfig(hashMap).build());
        Statement statement = getStatement();
        Throwable th = null;
        try {
            try {
                statement.execute("USE paimon_sync_table");
                statement.executeUpdate("INSERT INTO test_options_change VALUES (1, '2023-03-23', '2022-01-01 14:30', '2021-09-15 15:00:10')");
                statement.executeUpdate("INSERT INTO test_options_change VALUES (2, '2023-03-23', null, null)");
                if (statement != null) {
                    if (0 != 0) {
                        try {
                            statement.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        statement.close();
                    }
                }
                waitingTables(this.tableName);
                runActionWithDefaultEnv.cancel();
                hashMap.put("sink.savepoint.auto-tag", "true");
                hashMap.put("tag.num-retained-max", "5");
                hashMap.put("tag.automatic-creation", "process-time");
                hashMap.put("tag.creation-period", "hourly");
                hashMap.put("tag.creation-delay", "600000");
                hashMap.put("snapshot.time-retained", "1h");
                hashMap.put("snapshot.num-retained.min", "5");
                hashMap.put("snapshot.num-retained.max", "10");
                hashMap.put("changelog-producer", "input");
                runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPartitionKeys("pt").withPrimaryKeys("pk", "pt").withComputedColumnArgs("pt=substring(_date,5)").withTableConfig(hashMap).build());
                Assertions.assertThat(getFileStoreTable().options()).containsAllEntriesOf(hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (statement != null) {
                if (th != null) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testOptionsChangeInExistingTable() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", "1");
        hashMap.put("sink.parallelism", "1");
        hashMap.put("sequence.field", "_timestamp");
        createFileStoreTable(RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP(0)}, new String[]{"pk", "_date", "_timestamp"}), Collections.emptyList(), Collections.singletonList("pk"), hashMap);
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "test_exist_options_change");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("sequence.field", "_date");
        hashMap2.put("sink.parallelism", "2");
        hashMap2.put("snapshot.expire.limit", "1000");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPrimaryKeys("pk").withTableConfig(hashMap2).build());
        FileStoreTable fileStoreTable = getFileStoreTable();
        Assertions.assertThat((String) fileStoreTable.options().get("bucket")).isEqualTo("1");
        Assertions.assertThat((String) fileStoreTable.options().get("sequence.field")).isEqualTo("_timestamp");
        Assertions.assertThat((String) fileStoreTable.options().get("sink.parallelism")).isEqualTo("2");
        Assertions.assertThat((String) fileStoreTable.options().get("snapshot.expire.limit")).isEqualTo("1000");
    }

    @Timeout(60)
    @Test
    public void testMetadataColumns() throws Exception {
        Statement statement = getStatement();
        Throwable th = null;
        try {
            statement.execute("USE metadata");
            statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (1, '2023-07-30')");
            statement.executeUpdate("INSERT INTO test_metadata_columns VALUES (2, '2023-07-30')");
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    statement.close();
                }
            }
            Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
            basicMySqlConfig.put("database-name", "metadata");
            basicMySqlConfig.put("table-name", "test_metadata_columns");
            runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPrimaryKeys("pk").withMetadataColumns("table_name", "database_name", "op_ts").build());
            waitForResult(Arrays.asList("+I[1, 2023-07-30, test_metadata_columns, metadata, 1970-01-01T00:00]", "+I[2, 2023-07-30, test_metadata_columns, metadata, 1970-01-01T00:00]"), getFileStoreTable(), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.STRING().notNull(), DataTypes.STRING().notNull(), DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(3).notNull()}, new String[]{"pk", "_date", "table_name", "database_name", "op_ts"}), Collections.singletonList("pk"));
        } catch (Throwable th3) {
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCatalogAndTableConfig() {
        MySqlSyncTableAction build = syncTableActionBuilder(getBasicMySqlConfig()).withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")).withTableConfig(Collections.singletonMap("table-key", "table-value")).build();
        Assertions.assertThat(build.catalogConfig()).containsEntry("catalog-key", "catalog-value");
        Assertions.assertThat(build.tableConfig()).containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
    }

    private FileStoreTable getFileStoreTable() throws Exception {
        return getFileStoreTable(this.tableName);
    }

    @Timeout(60)
    @Test
    public void testDefaultCheckpointInterval() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "default_checkpoint");
        basicMySqlConfig.put("table-name", "t");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        MySqlSyncTableAction build = syncTableActionBuilder(basicMySqlConfig).build();
        build.withStreamExecutionEnvironment(executionEnvironment);
        Thread thread = new Thread(() -> {
            try {
                build.run();
            } catch (Exception e) {
            }
        });
        thread.start();
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(executionEnvironment.getCheckpointConfig().isCheckpointingEnabled());
        }, Duration.ofSeconds(5L), Duration.ofMillis(100L));
        Assertions.assertThat(executionEnvironment.getCheckpointInterval()).isEqualTo(180000L);
        thread.interrupt();
        executionEnvironment.close();
    }

    @Timeout(60)
    @Test
    public void testComputedColumnWithCaseInsensitive() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "computed_column_with_case_insensitive");
        basicMySqlConfig.put("table-name", "t");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withCatalogConfig(Collections.singletonMap(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")).withComputedColumnArgs("SUBSTRING=substring(UPPERCASE_STRING,2)").build());
        Statement statement = getStatement();
        Throwable th = null;
        try {
            statement.execute("USE computed_column_with_case_insensitive");
            statement.executeUpdate("INSERT INTO t VALUES (1, 'apache')");
            statement.executeUpdate("INSERT INTO t VALUES (2, null)");
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    statement.close();
                }
            }
            waitForResult(Arrays.asList("+I[1, apache, ache]", "+I[2, NULL, NULL]"), getFileStoreTable(), RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(10), DataTypes.STRING()}, new String[]{"pk", "uppercase_string", "substring"}), Collections.singletonList("pk"));
        } catch (Throwable th3) {
            if (statement != null) {
                if (0 != 0) {
                    try {
                        statement.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    statement.close();
                }
            }
            throw th3;
        }
    }

    @Timeout(60)
    @Test
    public void testSpecifyKeysWithCaseInsensitive() throws Exception {
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "specify_key_with_case_insensitive");
        basicMySqlConfig.put("table-name", "t");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withCatalogConfig(Collections.singletonMap(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")).withPrimaryKeys("ID1", "PART").withPartitionKeys("PART").build());
        FileStoreTable fileStoreTable = getFileStoreTable();
        Assertions.assertThat(fileStoreTable.primaryKeys()).containsExactly(new String[]{"id1", "part"});
        Assertions.assertThat(fileStoreTable.partitionKeys()).containsExactly(new String[]{"part"});
    }

    @Test
    public void testInvalidAlterBucket() throws Exception {
        createFileStoreTable(RowType.of(new DataType[]{DataTypes.INT()}, new String[]{"k"}), Collections.emptyList(), Collections.singletonList("k"), Collections.singletonMap(CoreOptions.BUCKET.key(), "1"));
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", "invalid_alter_bucket");
        basicMySqlConfig.put("table-name", "t");
        MySqlSyncTableAction build = syncTableActionBuilder(basicMySqlConfig).withTableConfig(Collections.singletonMap(CoreOptions.BUCKET.key(), "2")).build();
        build.getClass();
        Assertions.assertThatCode(build::build).doesNotThrowAnyException();
        Assertions.assertThat((String) getFileStoreTable().options().get(CoreOptions.BUCKET.key())).isEqualTo("1");
    }

    @Timeout(60)
    @Test
    public void testColumnCommentChangeInExistingTable() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", "1");
        hashMap.put("sink.parallelism", "1");
        createFileStoreTable(RowType.builder().field("pk", DataTypes.INT().notNull(), "pk comment").field("c1", DataTypes.DATE(), "c1 comment").field("c2", DataTypes.VARCHAR(10).notNull(), "c2 comment").build(), Collections.emptyList(), Collections.singletonList("pk"), hashMap);
        Map<String, String> basicMySqlConfig = getBasicMySqlConfig();
        basicMySqlConfig.put("database-name", DATABASE_NAME);
        basicMySqlConfig.put("table-name", "test_exist_column_comment_change");
        runActionWithDefaultEnv(syncTableActionBuilder(basicMySqlConfig).withPrimaryKeys("pk").withTableConfig(getBasicTableConfig()).build());
        Map map = (Map) getFileStoreTable().schema().fields().stream().collect(Collectors.toMap((v0) -> {
            return v0.name();
        }, Function.identity()));
        Assertions.assertThat(((DataField) map.get("pk")).description()).isEqualTo("pk comment");
        Assertions.assertThat(((DataField) map.get("c1")).description()).isEqualTo("c1 comment");
        Assertions.assertThat(((DataField) map.get("c2")).description()).isEqualTo("c2 comment");
    }
}
