package org.apache.flink.cdc.connectors.mysql.source;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.common.data.binary.BinaryStringData;
import org.apache.flink.cdc.common.event.AddColumnEvent;
import org.apache.flink.cdc.common.event.AlterColumnTypeEvent;
import org.apache.flink.cdc.common.event.CreateTableEvent;
import org.apache.flink.cdc.common.event.DataChangeEvent;
import org.apache.flink.cdc.common.event.DropColumnEvent;
import org.apache.flink.cdc.common.event.Event;
import org.apache.flink.cdc.common.event.RenameColumnEvent;
import org.apache.flink.cdc.common.event.TableId;
import org.apache.flink.cdc.common.schema.Column;
import org.apache.flink.cdc.common.schema.Schema;
import org.apache.flink.cdc.common.types.BinaryType;
import org.apache.flink.cdc.common.types.CharType;
import org.apache.flink.cdc.common.types.DataType;
import org.apache.flink.cdc.common.types.DataTypes;
import org.apache.flink.cdc.common.types.RowType;
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.CloseableIterator;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.testcontainers.lifecycle.Startables;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.class */
public class MySqlPipelineITCase extends MySqlSourceTestBase {
    protected static final MySqlContainer MYSQL8_CONTAINER = createMySqlContainer(MySqlVersion.V8_0);
    private final UniqueDatabase inventoryDatabase = new UniqueDatabase(MYSQL8_CONTAINER, "inventory", MySqSourceTestUtils.TEST_USER, MySqSourceTestUtils.TEST_PASSWORD);
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    @BeforeClass
    public static void startContainers() {
        LOG.info("Starting containers...");
        Startables.deepStart(Stream.of(MYSQL8_CONTAINER)).join();
        LOG.info("Containers are started.");
    }

    @AfterClass
    public static void stopContainers() {
        LOG.info("Stopping containers...");
        MYSQL8_CONTAINER.stop();
        LOG.info("Containers are stopped.");
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.env.setParallelism(4);
        this.env.enableCheckpointing(2000L);
        this.env.setRestartStrategy(RestartStrategies.noRestart());
    }

    @Test
    public void testInitialStartupMode() throws Exception {
        this.inventoryDatabase.createAndInitialize();
        CloseableIterator executeAndCollect = this.env.fromSource(new MySqlDataSource(new MySqlSourceConfigFactory().hostname(MYSQL8_CONTAINER.getHost()).port(MYSQL8_CONTAINER.getDatabasePort()).username(MySqSourceTestUtils.TEST_USER).password(MySqSourceTestUtils.TEST_PASSWORD).databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{this.inventoryDatabase.getDatabaseName() + "\\.products"}).startupOptions(StartupOptions.initial()).serverId(MySqSourceTestUtils.getServerId(this.env.getParallelism())).serverTimeZone("UTC").includeSchemaChanges(((Boolean) MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED.defaultValue()).booleanValue())).getEventSourceProvider().getSource(), WatermarkStrategy.noWatermarks(), "mysql", new EventTypeInfo()).executeAndCollect();
        Thread.sleep(10000L);
        TableId tableId = TableId.tableId(this.inventoryDatabase.getDatabaseName(), "products");
        CreateTableEvent productsCreateTableEvent = getProductsCreateTableEvent(tableId);
        List<Event> snapshotExpected = getSnapshotExpected(tableId);
        ArrayList arrayList = new ArrayList();
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    arrayList.addAll(executeAlterAndProvideExpected(tableId, createStatement));
                    BinaryRecordDataGenerator binaryRecordDataGenerator = new BinaryRecordDataGenerator(RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(255).notNull(), DataTypes.FLOAT(), DataTypes.VARCHAR(45), DataTypes.VARCHAR(55)}, new String[]{"id", "name", "weight", "col1", "col2"}));
                    createStatement.execute(String.format("INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{110, BinaryStringData.fromString("scooter"), Float.valueOf(5.5f), BinaryStringData.fromString("c-10"), BinaryStringData.fromString("c-20")})));
                    createStatement.execute(String.format("INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{111, BinaryStringData.fromString("football"), Float.valueOf(6.6f), BinaryStringData.fromString("c-11"), BinaryStringData.fromString("c-21")})));
                    createStatement.execute(String.format("UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(DataChangeEvent.updateEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{110, BinaryStringData.fromString("scooter"), Float.valueOf(5.5f), BinaryStringData.fromString("c-10"), BinaryStringData.fromString("c-20")}), binaryRecordDataGenerator.generate(new Object[]{110, BinaryStringData.fromString("scooter"), Float.valueOf(5.5f), BinaryStringData.fromString("c-12"), BinaryStringData.fromString("c-22")})));
                    createStatement.execute(String.format("DELETE FROM `%s`.`products` WHERE `id` = 111;", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(DataChangeEvent.deleteEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{111, BinaryStringData.fromString("football"), Float.valueOf(6.6f), BinaryStringData.fromString("c-11"), BinaryStringData.fromString("c-21")})));
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    List fetchResults = MySqSourceTestUtils.fetchResults(executeAndCollect, 1 + snapshotExpected.size() + arrayList.size());
                    Assertions.assertThat(fetchResults.get(0)).isEqualTo(productsCreateTableEvent);
                    Assertions.assertThat(fetchResults.subList(1, 10)).containsExactlyInAnyOrder(snapshotExpected.toArray(new Event[0]));
                    Assertions.assertThat(fetchResults.subList(10, fetchResults.size())).isEqualTo(arrayList);
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    @Test
    public void testParseAlterStatement() throws Exception {
        this.env.setParallelism(1);
        this.inventoryDatabase.createAndInitialize();
        CloseableIterator executeAndCollect = this.env.fromSource(new MySqlDataSource(new MySqlSourceConfigFactory().hostname(MYSQL8_CONTAINER.getHost()).port(MYSQL8_CONTAINER.getDatabasePort()).username(MySqSourceTestUtils.TEST_USER).password(MySqSourceTestUtils.TEST_PASSWORD).databaseList(new String[]{this.inventoryDatabase.getDatabaseName()}).tableList(new String[]{this.inventoryDatabase.getDatabaseName() + "\\.products"}).startupOptions(StartupOptions.latest()).serverId(MySqSourceTestUtils.getServerId(this.env.getParallelism())).serverTimeZone("UTC").includeSchemaChanges(((Boolean) MySqlDataSourceOptions.SCHEMA_CHANGE_ENABLED.defaultValue()).booleanValue())).getEventSourceProvider().getSource(), WatermarkStrategy.noWatermarks(), "mysql", new EventTypeInfo()).executeAndCollect();
        Thread.sleep(5000L);
        TableId tableId = TableId.tableId(this.inventoryDatabase.getDatabaseName(), "products");
        ArrayList arrayList = new ArrayList();
        arrayList.add(getProductsCreateTableEvent(tableId));
        Connection jdbcConnection = this.inventoryDatabase.getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    arrayList.addAll(executeAlterAndProvideExpected(tableId, createStatement));
                    createStatement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN (`cols1` VARCHAR(45), `cols2` VARCHAR(55));", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(new AddColumnEvent(tableId, Arrays.asList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols1", DataTypes.VARCHAR(45))), new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols2", DataTypes.VARCHAR(55))))));
                    createStatement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN (`cols3` VARCHAR(45), `cols4` VARCHAR(55));", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(new AddColumnEvent(tableId, Arrays.asList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols3", DataTypes.VARCHAR(45))), new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols4", DataTypes.VARCHAR(55))))));
                    createStatement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols5` BIT NULL;", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols5", DataTypes.BOOLEAN())))));
                    createStatement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols6` BINARY(0) NULL;", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols6", BinaryType.ofEmptyLiteral())))));
                    createStatement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols7` BINARY NULL;", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols7", DataTypes.BINARY(1))))));
                    createStatement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols8` CHAR(0) NULL;", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols8", CharType.ofEmptyLiteral())))));
                    createStatement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `cols9` CHAR NULL;", this.inventoryDatabase.getDatabaseName()));
                    arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("cols9", DataTypes.CHAR(1))))));
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    Assertions.assertThat(MySqSourceTestUtils.fetchResults(executeAndCollect, arrayList.size())).isEqualTo(arrayList);
                } finally {
                }
            } catch (Throwable th4) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
        }
    }

    private CreateTableEvent getProductsCreateTableEvent(TableId tableId) {
        return new CreateTableEvent(tableId, Schema.newBuilder().physicalColumn("id", DataTypes.INT().notNull()).physicalColumn("name", DataTypes.VARCHAR(255).notNull()).physicalColumn("description", DataTypes.VARCHAR(512)).physicalColumn("weight", DataTypes.FLOAT()).primaryKey(Collections.singletonList("id")).build());
    }

    private List<Event> getSnapshotExpected(TableId tableId) {
        BinaryRecordDataGenerator binaryRecordDataGenerator = new BinaryRecordDataGenerator(RowType.of(new DataType[]{DataTypes.INT().notNull(), DataTypes.VARCHAR(255).notNull(), DataTypes.VARCHAR(512), DataTypes.FLOAT()}, new String[]{"id", "name", "description", "weight"}));
        ArrayList arrayList = new ArrayList();
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{101, BinaryStringData.fromString("scooter"), BinaryStringData.fromString("Small 2-wheel scooter"), Float.valueOf(3.14f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{102, BinaryStringData.fromString("car battery"), BinaryStringData.fromString("12V car battery"), Float.valueOf(8.1f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{103, BinaryStringData.fromString("12-pack drill bits"), BinaryStringData.fromString("12-pack of drill bits with sizes ranging from #40 to #3"), Float.valueOf(0.8f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{104, BinaryStringData.fromString("hammer"), BinaryStringData.fromString("12oz carpenter's hammer"), Float.valueOf(0.75f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{105, BinaryStringData.fromString("hammer"), BinaryStringData.fromString("14oz carpenter's hammer"), Float.valueOf(0.875f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{106, BinaryStringData.fromString("hammer"), BinaryStringData.fromString("16oz carpenter's hammer"), Float.valueOf(1.0f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{107, BinaryStringData.fromString("rocks"), BinaryStringData.fromString("box of assorted rocks"), Float.valueOf(5.3f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{108, BinaryStringData.fromString("jacket"), BinaryStringData.fromString("water resistent black wind breaker"), Float.valueOf(0.1f)})));
        arrayList.add(DataChangeEvent.insertEvent(tableId, binaryRecordDataGenerator.generate(new Object[]{109, BinaryStringData.fromString("spare tire"), BinaryStringData.fromString("24 inch spare tire"), Float.valueOf(22.2f)})));
        return arrayList;
    }

    private List<Event> executeAlterAndProvideExpected(TableId tableId, Statement statement) throws SQLException {
        ArrayList arrayList = new ArrayList();
        statement.execute(String.format("ALTER TABLE `%s`.`products` CHANGE COLUMN `description` `desc` VARCHAR(255) NULL DEFAULT NULL;", this.inventoryDatabase.getDatabaseName()));
        arrayList.add(new AlterColumnTypeEvent(tableId, Collections.singletonMap("description", DataTypes.VARCHAR(255))));
        arrayList.add(new RenameColumnEvent(tableId, Collections.singletonMap("description", "desc")));
        statement.execute(String.format("ALTER TABLE `%s`.`products` CHANGE COLUMN `desc` `desc2` VARCHAR(400) NULL DEFAULT NULL;", this.inventoryDatabase.getDatabaseName()));
        arrayList.add(new AlterColumnTypeEvent(tableId, Collections.singletonMap("desc", DataTypes.VARCHAR(400))));
        arrayList.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc", "desc2")));
        statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `desc1` VARCHAR(45) NULL AFTER `weight`;", this.inventoryDatabase.getDatabaseName()));
        arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("desc1", DataTypes.VARCHAR(45)), AddColumnEvent.ColumnPosition.AFTER, "weight"))));
        statement.execute(String.format("ALTER TABLE `%s`.`products` ADD COLUMN `col1` VARCHAR(45) NULL AFTER `weight`, ADD COLUMN `col2` VARCHAR(55) NULL AFTER `desc1`;", this.inventoryDatabase.getDatabaseName()));
        arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col1", DataTypes.VARCHAR(45)), AddColumnEvent.ColumnPosition.AFTER, "weight"))));
        arrayList.add(new AddColumnEvent(tableId, Collections.singletonList(new AddColumnEvent.ColumnWithPosition(Column.physicalColumn("col2", DataTypes.VARCHAR(55)), AddColumnEvent.ColumnPosition.AFTER, "desc1"))));
        statement.execute(String.format("ALTER TABLE `%s`.`products` DROP COLUMN `desc2`, CHANGE COLUMN `desc1` `desc1` VARCHAR(65) NULL DEFAULT NULL;", this.inventoryDatabase.getDatabaseName()));
        arrayList.add(new DropColumnEvent(tableId, Collections.singletonList("desc2")));
        arrayList.add(new AlterColumnTypeEvent(tableId, Collections.singletonMap("desc1", DataTypes.VARCHAR(65))));
        statement.execute(String.format("ALTER TABLE `%s`.`products` RENAME COLUMN `desc1` TO `desc3`;", this.inventoryDatabase.getDatabaseName()));
        arrayList.add(new RenameColumnEvent(tableId, Collections.singletonMap("desc1", "desc3")));
        statement.execute(String.format("ALTER TABLE `%s`.`products` DROP COLUMN `DESC3`;", this.inventoryDatabase.getDatabaseName()));
        arrayList.add(new DropColumnEvent(tableId, Collections.singletonList("DESC3")));
        statement.execute(String.format("ALTER TABLE `%s`.`orders` ADD COLUMN `desc1` VARCHAR(45) NULL;", this.inventoryDatabase.getDatabaseName()));
        return arrayList;
    }
}
