package org.apache.flink.cdc.connectors.mysql.source;

import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnection;
import io.debezium.jdbc.JdbcConnection;
import java.lang.invoke.SerializedLambda;
import java.sql.SQLException;
import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.mysql.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mysql.table.MySqlDeserializationConverterFactory;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTable;
import org.apache.flink.cdc.connectors.mysql.testutils.TestTableSchemas;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
import org.apache.flink.cdc.debezium.table.MetadataConverter;
import org.apache.flink.cdc.debezium.table.RowDataDebeziumDeserializeSchema;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowUtils;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase.class */
public class MySqlSourceITCase extends MySqlSourceTestBase {
    private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";

    @Parameterized.Parameter
    public String tableName;

    @Parameterized.Parameter(USE_POST_LOWWATERMARK_HOOK)
    public String chunkColumnName;
    private static final int USE_POST_LOWWATERMARK_HOOK = 1;
    private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
    private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);
    private final UniqueDatabase customDatabase = new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");
    private final List<String> initialChanges = Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]");
    private final List<String> firstPartBinlogEvents = Arrays.asList("-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]");
    private final List<String> secondPartBinlogEvents = Arrays.asList("-U[1010, user_11, Shanghai, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase$SleepingRowDataDebeziumDeserializeSchema.class */
    public static class SleepingRowDataDebeziumDeserializeSchema implements DebeziumDeserializationSchema<RowData> {
        private static final long serialVersionUID = 1;
        private final RowDataDebeziumDeserializeSchema deserializeSchema;
        private final long sleepMs;

        public SleepingRowDataDebeziumDeserializeSchema(RowDataDebeziumDeserializeSchema rowDataDebeziumDeserializeSchema, long j) {
            this.deserializeSchema = rowDataDebeziumDeserializeSchema;
            this.sleepMs = j;
        }

        public void deserialize(SourceRecord sourceRecord, Collector<RowData> collector) throws Exception {
            this.deserializeSchema.deserialize(sourceRecord, collector);
            Thread.sleep(this.sleepMs);
        }

        public TypeInformation<RowData> getProducedType() {
            return this.deserializeSchema.getProducedType();
        }
    }

    @Parameterized.Parameters(name = "table: {0}, chunkColumn: {1}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{"customers", null}, new Object[]{"customers", "id"}, new Object[]{"customers_no_pk", "id"});
    }

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        testMySqlParallelSource(USE_POST_LOWWATERMARK_HOOK, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName});
    }

    @Test
    public void testReadSingleTableWithSingleParallelismAndSkipBackFill() throws Exception {
        testMySqlParallelSource(USE_POST_LOWWATERMARK_HOOK, DEFAULT_SCAN_STARTUP_MODE, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName}, RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L), true);
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        testMySqlParallelSource(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName});
    }

    @Test
    public void testReadMultipleTableWithSingleParallelism() throws Exception {
        testMySqlParallelSource(USE_POST_LOWWATERMARK_HOOK, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testReadMultipleTableWithMultipleParallelism() throws Exception {
        testMySqlParallelSource(4, MySqlSourceTestBase.FailoverType.NONE, MySqlSourceTestBase.FailoverPhase.NEVER, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        testMySqlParallelSource(MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInBinlogPhase() throws Exception {
        testMySqlParallelSource(MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverFromLatestOffset() throws Exception {
        testMySqlParallelSource(4, "latest-offset", MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"}, RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L));
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        testMySqlParallelSource(MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testJobManagerFailoverInBinlogPhase() throws Exception {
        testMySqlParallelSource(MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"});
    }

    @Test
    public void testJobManagerFailoverFromLatestOffset() throws Exception {
        testMySqlParallelSource(4, "latest-offset", MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.BINLOG, new String[]{this.tableName, "customers_1"}, RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L));
    }

    @Test
    public void testTaskManagerFailoverSingleParallelism() throws Exception {
        testMySqlParallelSource(USE_POST_LOWWATERMARK_HOOK, MySqlSourceTestBase.FailoverType.TM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        testMySqlParallelSource(USE_POST_LOWWATERMARK_HOOK, MySqlSourceTestBase.FailoverType.JM, MySqlSourceTestBase.FailoverPhase.SNAPSHOT, new String[]{this.tableName});
    }

    @Test
    public void testSnapshotSplitReadingFailCrossCheckpoints() throws Exception {
        this.customDatabase.createAndInitialize();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(4);
        executionEnvironment.enableCheckpointing(5000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L));
        DataStreamSource fromSource = executionEnvironment.fromSource(buildSleepingSource(), WatermarkStrategy.noWatermarks(), "selfSource");
        String[] strArr = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        TypeSerializer createSerializer = fromSource.getTransformation().getOutputType().createSerializer(executionEnvironment.getConfig());
        String str = "dataStreamCollect_" + UUID.randomUUID();
        CollectSinkOperatorFactory collectSinkOperatorFactory = new CollectSinkOperatorFactory(createSerializer, str);
        CollectResultIterator collectResultIterator = new CollectResultIterator(collectSinkOperatorFactory.getOperator().getOperatorIdFuture(), createSerializer, str, executionEnvironment.getCheckpointConfig());
        CollectStreamSink collectStreamSink = new CollectStreamSink(fromSource, collectSinkOperatorFactory);
        collectStreamSink.name("Data stream collect sink");
        executionEnvironment.addOperator(collectStreamSink.getTransformation());
        JobClient executeAsync = executionEnvironment.executeAsync("snapshotSplitTest");
        collectResultIterator.setJobClient(executeAsync);
        JobID jobID = executeAsync.getJobID();
        if (collectResultIterator.hasNext()) {
            triggerFailover(MySqlSourceTestBase.FailoverType.JM, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        assertEqualsInAnyOrder(Arrays.asList(strArr), fetchRowData(collectResultIterator, strArr.length));
        Assert.assertTrue(!hasNextData(collectResultIterator));
        executeAsync.cancel().get();
    }

    @Test
    public void testStartFromEarliestOffset() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(this.initialChanges);
        arrayList.addAll(this.firstPartBinlogEvents);
        testStartingOffset(StartupOptions.earliest(), arrayList);
    }

    @Test
    public void testStartFromLatestOffset() throws Exception {
        testStartingOffset(StartupOptions.latest(), Collections.emptyList());
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot()));
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot()));
    }

    @Test
    public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()));
    }

    private List<String> testBackfillWhenWritingEvents(boolean z, int i, int i2, StartupOptions startupOptions) throws Exception {
        this.customDatabase.createAndInitialize();
        TestTable testTable = new TestTable(this.customDatabase, "customers", TestTableSchemas.CUSTOMERS);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setParallelism(USE_POST_LOWWATERMARK_HOOK);
        MySqlSource build = MySqlSource.builder().hostname(this.customDatabase.getHost()).port(this.customDatabase.getDatabasePort()).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverTimeZone("UTC").databaseList(new String[]{this.customDatabase.getDatabaseName()}).tableList(new String[]{testTable.getTableId()}).deserializer(testTable.getDeserializer()).skipSnapshotBackfill(z).startupOptions(startupOptions).build();
        String[] strArr = {String.format("INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')", testTable.getTableId()), String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", testTable.getTableId()), String.format("DELETE FROM %s WHERE id=1019", testTable.getTableId())};
        SnapshotPhaseHooks snapshotPhaseHooks = new SnapshotPhaseHooks();
        SnapshotPhaseHook snapshotPhaseHook = (mySqlConnection, sourceSplit) -> {
            mySqlConnection.setAutoCommit(false);
            mySqlConnection.execute(strArr);
            mySqlConnection.commit();
        };
        switch (i2) {
            case USE_POST_LOWWATERMARK_HOOK /* 1 */:
                snapshotPhaseHooks.setPostLowWatermarkAction(snapshotPhaseHook);
                break;
            case USE_PRE_HIGHWATERMARK_HOOK /* 2 */:
                snapshotPhaseHooks.setPreHighWatermarkAction(snapshotPhaseHook);
                break;
            case USE_POST_HIGHWATERMARK_HOOK /* 3 */:
                snapshotPhaseHooks.setPostHighWatermarkAction(snapshotPhaseHook);
                break;
        }
        build.setSnapshotHooks(snapshotPhaseHooks);
        CloseableIterator executeAndCollect = executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source").executeAndCollect();
        Throwable th = null;
        try {
            try {
                testTable.getClass();
                List<String> fetchRowData = fetchRowData(executeAndCollect, i, testTable::stringify);
                if (executeAndCollect != null) {
                    if (0 != 0) {
                        try {
                            executeAndCollect.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeAndCollect.close();
                    }
                }
                return fetchRowData;
            } finally {
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    private void testStartingOffset(StartupOptions startupOptions, List<String> list) throws Exception {
        this.customDatabase.createAndInitialize();
        String tableId = getTableId();
        makeFirstPartBinlogEvents(getConnection(), tableId);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        DataType ROW = DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("id", DataTypes.BIGINT()), DataTypes.FIELD("name", DataTypes.STRING()), DataTypes.FIELD("address", DataTypes.STRING()), DataTypes.FIELD("phone_number", DataTypes.STRING())});
        CloseableIterator executeAndCollect = executionEnvironment.fromSource(MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).serverTimeZone("UTC").tableList(new String[]{tableId}).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverId("5401-5404").deserializer(RowDataDebeziumDeserializeSchema.newBuilder().setPhysicalRowType(ROW.getLogicalType()).setResultTypeInfo(InternalTypeInfo.of(TypeConversions.fromDataToLogicalType(ROW))).build()).startupOptions(startupOptions).chunkKeyColumn(new ObjectPath(this.customDatabase.getDatabaseName(), this.tableName), this.chunkColumnName).build(), WatermarkStrategy.noWatermarks(), "MySQL CDC Source").executeAndCollect();
        Throwable th = null;
        try {
            try {
                assertEqualsInAnyOrder(list, fetchRowData(executeAndCollect, list.size()));
                if (executeAndCollect != null) {
                    if (0 == 0) {
                        executeAndCollect.close();
                        return;
                    }
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th4;
        }
    }

    private MySqlSource<RowData> buildSleepingSource() {
        RowType logicalType = new ResolvedSchema(Arrays.asList(Column.physical("id", DataTypes.BIGINT().notNull()), Column.physical("name", DataTypes.STRING()), Column.physical("address", DataTypes.STRING()), Column.physical("phone_number", DataTypes.STRING())), new ArrayList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("id"))).toPhysicalRowDataType().getLogicalType();
        return MySqlSource.builder().hostname(MYSQL_CONTAINER.getHost()).port(MYSQL_CONTAINER.getDatabasePort()).databaseList(new String[]{this.customDatabase.getDatabaseName()}).tableList(new String[]{getTableId()}).username(this.customDatabase.getUsername()).password(this.customDatabase.getPassword()).serverTimeZone("UTC").serverId(getServerId()).splitSize(8096).splitMetaGroupSize(1000).distributionFactorUpper(1000.0d).distributionFactorLower(0.05d).fetchSize(1024).connectTimeout(Duration.ofSeconds(30L)).connectMaxRetries(USE_POST_HIGHWATERMARK_HOOK).connectionPoolSize(20).debeziumProperties(new Properties()).startupOptions(StartupOptions.initial()).deserializer(new SleepingRowDataDebeziumDeserializeSchema(RowDataDebeziumDeserializeSchema.newBuilder().setPhysicalRowType(logicalType).setMetadataConverters(new MetadataConverter[0]).setResultTypeInfo(InternalTypeInfo.of(logicalType)).setServerTimeZone(ZoneId.of("UTC")).setUserDefinedConverterFactory(MySqlDeserializationConverterFactory.instance()).build(), 1000L)).scanNewlyAddedTableEnabled(false).jdbcProperties(new Properties()).heartbeatInterval(Duration.ofSeconds(30L)).chunkKeyColumn(new ObjectPath(this.customDatabase.getDatabaseName(), this.tableName), this.chunkColumnName).build();
    }

    private void testMySqlParallelSource(MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testMySqlParallelSource(4, failoverType, failoverPhase, strArr);
    }

    private void testMySqlParallelSource(int i, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testMySqlParallelSource(i, DEFAULT_SCAN_STARTUP_MODE, failoverType, failoverPhase, strArr, RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L));
    }

    private void testMySqlParallelSource(int i, String str, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] strArr, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration) throws Exception {
        testMySqlParallelSource(i, str, failoverType, failoverPhase, strArr, restartStrategyConfiguration, false);
    }

    private void testMySqlParallelSource(int i, String str, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] strArr, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, boolean z) throws Exception {
        this.customDatabase.createAndInitialize();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(restartStrategyConfiguration);
        String str2 = "CREATE TABLE customers ( id BIGINT NOT NULL, name STRING, address STRING, phone_number STRING" + ("customers_no_pk".equals(this.tableName) ? "" : ", primary key (id) not enforced") + ") WITH ( 'connector' = 'mysql-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.startup.mode' = '%s', 'scan.incremental.snapshot.chunk.size' = '100', 'scan.incremental.snapshot.backfill.skip' = '%s', 'server-time-zone' = 'UTC', 'server-id' = '%s' %s)";
        Object[] objArr = new Object[10];
        objArr[0] = MYSQL_CONTAINER.getHost();
        objArr[USE_POST_LOWWATERMARK_HOOK] = Integer.valueOf(MYSQL_CONTAINER.getDatabasePort());
        objArr[USE_PRE_HIGHWATERMARK_HOOK] = this.customDatabase.getUsername();
        objArr[USE_POST_HIGHWATERMARK_HOOK] = this.customDatabase.getPassword();
        objArr[4] = this.customDatabase.getDatabaseName();
        objArr[5] = getTableNameRegex(strArr);
        objArr[6] = str;
        objArr[7] = Boolean.valueOf(z);
        objArr[8] = getServerId();
        objArr[9] = this.chunkColumnName == null ? "" : String.format(", 'scan.incremental.snapshot.chunk.key-column' = '%s'", this.chunkColumnName);
        create.executeSql(String.format(str2, objArr));
        TableResult executeSql = create.executeSql("select * from customers");
        if (DEFAULT_SCAN_STARTUP_MODE.equals(str)) {
            checkSnapshotData(executeSql, failoverType, failoverPhase, strArr);
        }
        checkBinlogData(executeSql, failoverType, failoverPhase, strArr);
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private void checkSnapshotData(TableResult tableResult, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        String[] strArr2 = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < strArr.length; i += USE_POST_LOWWATERMARK_HOOK) {
            arrayList.addAll(Arrays.asList(strArr2));
        }
        CloseableIterator collect = tableResult.collect();
        JobID jobID = ((JobClient) tableResult.getJobClient().get()).getJobID();
        if (failoverPhase == MySqlSourceTestBase.FailoverPhase.SNAPSHOT && collect.hasNext()) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
    }

    private void checkBinlogData(TableResult tableResult, MySqlSourceTestBase.FailoverType failoverType, MySqlSourceTestBase.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        waitUntilJobRunning(tableResult);
        CloseableIterator<?> collect = tableResult.collect();
        JobID jobID = ((JobClient) tableResult.getJobClient().get()).getJobID();
        int length = strArr.length;
        for (int i = 0; i < length; i += USE_POST_LOWWATERMARK_HOOK) {
            makeFirstPartBinlogEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + strArr[i]);
        }
        Thread.sleep(2000L);
        if (failoverPhase == MySqlSourceTestBase.FailoverPhase.BINLOG) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(200L);
            });
            waitUntilJobRunning(tableResult);
        }
        int length2 = strArr.length;
        for (int i2 = 0; i2 < length2; i2 += USE_POST_LOWWATERMARK_HOOK) {
            makeSecondPartBinlogEvents(getConnection(), this.customDatabase.getDatabaseName() + '.' + strArr[i2]);
        }
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < strArr.length; i3 += USE_POST_LOWWATERMARK_HOOK) {
            arrayList.addAll(this.firstPartBinlogEvents);
            arrayList.addAll(this.secondPartBinlogEvents);
        }
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
        Assert.assertTrue(!hasNextData(collect));
    }

    private static List<String> convertRowDataToRowString(List<RowData> list) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("id", 0);
        linkedHashMap.put("name", Integer.valueOf(USE_POST_LOWWATERMARK_HOOK));
        linkedHashMap.put("address", Integer.valueOf(USE_PRE_HIGHWATERMARK_HOOK));
        linkedHashMap.put("phone_number", Integer.valueOf(USE_POST_HIGHWATERMARK_HOOK));
        return (List) list.stream().map(rowData -> {
            return RowUtils.createRowWithNamedPositions(rowData.getRowKind(), new Object[]{Long.valueOf(rowData.getLong(0)), rowData.getString(USE_POST_LOWWATERMARK_HOOK), rowData.getString(USE_PRE_HIGHWATERMARK_HOOK), rowData.getString(USE_POST_HIGHWATERMARK_HOOK)}, linkedHashMap).toString();
        }).collect(Collectors.toList());
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    private List<String> fetchRowData(Iterator<RowData> it, int i, Function<RowData, String> function) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next());
            i--;
        }
        return (List) arrayList.stream().map(function).collect(Collectors.toList());
    }

    private static List<String> fetchRowData(Iterator<RowData> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next());
            i--;
        }
        return convertRowDataToRowString(arrayList);
    }

    private String getTableNameRegex(String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == USE_POST_LOWWATERMARK_HOOK ? strArr[0] : String.format("(%s)", StringUtils.join(strArr, "|"));
    }

    private String getServerId() {
        int nextInt = new Random().nextInt(100) + 5400;
        return nextInt + "-" + (nextInt + 4);
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private void makeFirstPartBinlogEvents(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 103", "DELETE FROM " + str + " where id = 102", "INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')", "UPDATE " + str + " SET address = 'Shanghai' where id = 103"});
            jdbcConnection.commit();
        } finally {
            jdbcConnection.close();
        }
    }

    private void makeSecondPartBinlogEvents(JdbcConnection jdbcConnection, String str) throws SQLException {
        try {
            jdbcConnection.setAutoCommit(false);
            jdbcConnection.execute(new String[]{"UPDATE " + str + " SET address = 'Hangzhou' where id = 1010"});
            jdbcConnection.commit();
            jdbcConnection.execute(new String[]{"INSERT INTO " + str + " VALUES(2001, 'user_22','Shanghai','123567891234'), (2002, 'user_23','Shanghai','123567891234'),(2003, 'user_24','Shanghai','123567891234')"});
            jdbcConnection.commit();
        } finally {
            jdbcConnection.close();
        }
    }

    private MySqlConnection getConnection() {
        HashMap hashMap = new HashMap();
        hashMap.put("database.hostname", MYSQL_CONTAINER.getHost());
        hashMap.put("database.port", String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
        hashMap.put("database.user", this.customDatabase.getUsername());
        hashMap.put("database.password", this.customDatabase.getPassword());
        hashMap.put("database.serverTimezone", ZoneId.of("UTC").toString());
        return DebeziumUtils.createMySqlConnection(Configuration.from(hashMap), new Properties());
    }

    private String getTableId() {
        return this.customDatabase.getDatabaseName() + "." + this.tableName;
    }

    private void waitUntilJobRunning(TableResult tableResult) throws InterruptedException, ExecutionException {
        do {
            Thread.sleep(5000L);
        } while (((JobClient) tableResult.getJobClient().get()).getJobStatus().get() != JobStatus.RUNNING);
    }

    private boolean hasNextData(CloseableIterator<?> closeableIterator) throws InterruptedException, ExecutionException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            closeableIterator.getClass();
            FutureTask futureTask = new FutureTask(closeableIterator::hasNext);
            newSingleThreadExecutor.execute(futureTask);
            boolean booleanValue = ((Boolean) futureTask.get(3L, TimeUnit.SECONDS)).booleanValue();
            newSingleThreadExecutor.shutdown();
            return booleanValue;
        } catch (TimeoutException e) {
            newSingleThreadExecutor.shutdown();
            return false;
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdown();
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2058058662:
                if (implMethodName.equals("lambda$testBackfillWhenWritingEvents$51703d1b$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/cdc/connectors/mysql/source/utils/hooks/SnapshotPhaseHook") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/flink/cdc/connectors/mysql/source/MySqlSourceITCase") && serializedLambda.getImplMethodSignature().equals("([Ljava/lang/String;Lio/debezium/connector/mysql/MySqlConnection;Lorg/apache/flink/api/connector/source/SourceSplit;)V")) {
                    String[] strArr = (String[]) serializedLambda.getCapturedArg(0);
                    return (mySqlConnection, sourceSplit) -> {
                        mySqlConnection.setAutoCommit(false);
                        mySqlConnection.execute(strArr);
                        mySqlConnection.commit();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
