package org.apache.flink.cdc.connectors.oracle.source;

import io.debezium.connector.oracle.OracleConnection;
import io.debezium.jdbc.JdbcConfiguration;
import java.lang.invoke.SerializedLambda;
import java.sql.Connection;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.base.config.JdbcSourceConfig;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.oracle.source.OracleSourceBuilder;
import org.apache.flink.cdc.connectors.oracle.source.utils.OracleConnectionUtils;
import org.apache.flink.cdc.connectors.oracle.testutils.TestTable;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase.class */
public class OracleSourceITCase extends OracleSourceTestBase {
    private static final int USE_POST_LOWWATERMARK_HOOK = 1;
    private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
    private static final int USE_POST_HIGHWATERMARK_HOOK = 3;
    private static final Logger LOG = LoggerFactory.getLogger(OracleSourceITCase.class);

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.cdc.connectors.oracle.source.OracleSourceITCase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$cdc$connectors$oracle$source$OracleSourceITCase$FailoverType = new int[FailoverType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$cdc$connectors$oracle$source$OracleSourceITCase$FailoverType[FailoverType.TM.ordinal()] = OracleSourceITCase.USE_POST_LOWWATERMARK_HOOK;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$cdc$connectors$oracle$source$OracleSourceITCase$FailoverType[FailoverType.JM.ordinal()] = OracleSourceITCase.USE_PRE_HIGHWATERMARK_HOOK;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$cdc$connectors$oracle$source$OracleSourceITCase$FailoverType[FailoverType.NONE.ordinal()] = OracleSourceITCase.USE_POST_HIGHWATERMARK_HOOK;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase$FailoverPhase.class */
    public enum FailoverPhase {
        SNAPSHOT,
        REDO_LOG,
        NEVER
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase$FailoverType.class */
    public enum FailoverType {
        TM,
        JM,
        NONE
    }

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        testOracleParallelSource(USE_POST_LOWWATERMARK_HOOK, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        testOracleParallelSource(4, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        testOracleParallelSource(FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testTaskManagerFailoverInRedoLogPhase() throws Exception {
        testOracleParallelSource(FailoverType.TM, FailoverPhase.REDO_LOG, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        testOracleParallelSource(FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverInRedoLogPhase() throws Exception {
        testOracleParallelSource(FailoverType.JM, FailoverPhase.REDO_LOG, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testTaskManagerFailoverSingleParallelism() throws Exception {
        testOracleParallelSource(USE_POST_LOWWATERMARK_HOOK, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        testOracleParallelSource(USE_POST_LOWWATERMARK_HOOK, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"CUSTOMERS"});
    }

    @Test
    public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
        testOracleParallelSource(4, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"CUSTOMERS"}, true, RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L), null);
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 22, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot()));
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 22, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot()));
    }

    @Test
    public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
        assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()));
    }

    @Test
    public void testTableWithChunkColumnOfNoPrimaryKey() {
        try {
            testOracleParallelSource(USE_POST_LOWWATERMARK_HOOK, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"CUSTOMERS"}, false, RestartStrategies.noRestart(), "NAME");
        } catch (Exception e) {
            Assert.assertTrue(ExceptionUtils.findThrowableWithMessage(e, String.format("Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.", "NAME", "ID", "customer.DEBEZIUM.CUSTOMERS")).isPresent());
        }
    }

    private List<String> testBackfillWhenWritingEvents(boolean z, int i, int i2, StartupOptions startupOptions) throws Exception {
        createAndInitialize("customer.sql");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setParallelism(USE_POST_LOWWATERMARK_HOOK);
        TestTable testTable = new TestTable(OracleSourceTestBase.ORACLE_DATABASE, OracleSourceTestBase.ORACLE_SCHEMA, "CUSTOMERS", new ResolvedSchema(Arrays.asList(Column.physical("ID", DataTypes.BIGINT().notNull()), Column.physical("NAME", DataTypes.STRING()), Column.physical("ADDRESS", DataTypes.STRING()), Column.physical("PHONE_NUMBER", DataTypes.STRING())), new ArrayList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("ID"))));
        String tableId = testTable.getTableId();
        OracleSourceBuilder.OracleIncrementalSource build = OracleSourceBuilder.OracleIncrementalSource.builder().hostname(ORACLE_CONTAINER.getHost()).port(ORACLE_CONTAINER.getOraclePort().intValue()).username(OracleSourceTestBase.CONNECTOR_USER).password("dbz").databaseList(new String[]{OracleSourceTestBase.ORACLE_DATABASE}).schemaList(new String[]{OracleSourceTestBase.ORACLE_SCHEMA}).tableList(new String[]{"DEBEZIUM.CUSTOMERS"}).skipSnapshotBackfill(z).startupOptions(startupOptions).deserializer(testTable.getDeserializer()).build();
        SnapshotPhaseHooks snapshotPhaseHooks = new SnapshotPhaseHooks();
        String[] strArr = {String.format("INSERT INTO %s VALUES (15213, 'user_15213', 'Shanghai', '123567891234')", tableId), String.format("UPDATE %s SET address='Pittsburgh' WHERE id=2000", tableId), String.format("DELETE FROM %s WHERE id=1019", tableId)};
        SnapshotPhaseHook snapshotPhaseHook = (sourceConfig, sourceSplit) -> {
            OracleConnection createOracleConnection = OracleConnectionUtils.createOracleConnection(JdbcConfiguration.copy(((JdbcSourceConfig) sourceConfig).getDbzConnectorConfig().getJdbcConfig()).withUser(OracleSourceTestBase.TEST_USER).withPassword("dbz").build());
            Throwable th = null;
            try {
                try {
                    createOracleConnection.execute(strArr);
                    if (createOracleConnection != null) {
                        if (0 == 0) {
                            createOracleConnection.close();
                            return;
                        }
                        try {
                            createOracleConnection.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                } catch (Throwable th3) {
                    th = th3;
                    throw th3;
                }
            } catch (Throwable th4) {
                if (createOracleConnection != null) {
                    if (th != null) {
                        try {
                            createOracleConnection.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        createOracleConnection.close();
                    }
                }
                throw th4;
            }
        };
        switch (i2) {
            case USE_POST_LOWWATERMARK_HOOK /* 1 */:
                snapshotPhaseHooks.setPostLowWatermarkAction(snapshotPhaseHook);
                break;
            case USE_PRE_HIGHWATERMARK_HOOK /* 2 */:
                snapshotPhaseHooks.setPreHighWatermarkAction(snapshotPhaseHook);
                break;
            case USE_POST_HIGHWATERMARK_HOOK /* 3 */:
                snapshotPhaseHooks.setPostHighWatermarkAction(snapshotPhaseHook);
                break;
        }
        build.setSnapshotHooks(snapshotPhaseHooks);
        new ArrayList();
        CloseableIterator executeAndCollect = executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source").executeAndCollect();
        Throwable th = null;
        try {
            try {
                testTable.getClass();
                List<String> fetchRowData = fetchRowData(executeAndCollect, i, testTable::stringify);
                executionEnvironment.close();
                if (executeAndCollect != null) {
                    if (0 != 0) {
                        try {
                            executeAndCollect.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeAndCollect.close();
                    }
                }
                return fetchRowData;
            } finally {
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    private void testOracleParallelSource(FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testOracleParallelSource(4, failoverType, failoverPhase, strArr);
    }

    private void testOracleParallelSource(int i, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testOracleParallelSource(i, failoverType, failoverPhase, strArr, false, RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L), null);
    }

    private void testOracleParallelSource(int i, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr, boolean z, RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration, String str) throws Exception {
        createAndInitialize("customer.sql");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(restartStrategyConfiguration);
        Object[] objArr = new Object[9];
        objArr[0] = ORACLE_CONTAINER.getHost();
        objArr[USE_POST_LOWWATERMARK_HOOK] = ORACLE_CONTAINER.getOraclePort();
        objArr[USE_PRE_HIGHWATERMARK_HOOK] = ORACLE_CONTAINER.getUsername();
        objArr[USE_POST_HIGHWATERMARK_HOOK] = ORACLE_CONTAINER.getPassword();
        objArr[4] = OracleSourceTestBase.ORACLE_DATABASE;
        objArr[5] = OracleSourceTestBase.ORACLE_SCHEMA;
        objArr[6] = getTableNameRegex(strArr);
        objArr[7] = Boolean.valueOf(z);
        objArr[8] = str == null ? "" : ",'scan.incremental.snapshot.chunk.key-column'='" + str + "'";
        String[] strArr2 = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        create.executeSql(String.format("CREATE TABLE products ( ID INT NOT NULL, NAME STRING, ADDRESS STRING, PHONE_NUMBER STRING, primary key (ID) not enforced) WITH ( 'connector' = 'oracle-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'schema-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = 'false', 'debezium.log.mining.strategy' = 'online_catalog', 'debezium.database.history.store.only.captured.tables.ddl' = 'true', 'scan.incremental.snapshot.backfill.skip' = '%s'%s)", objArr));
        TableResult executeSql = create.executeSql("select * from products");
        CloseableIterator collect = executeSql.collect();
        JobID jobID = ((JobClient) executeSql.getJobClient().get()).getJobID();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < strArr.length; i2 += USE_POST_LOWWATERMARK_HOOK) {
            arrayList.addAll(Arrays.asList(strArr2));
        }
        if (failoverPhase == FailoverPhase.SNAPSHOT && collect.hasNext()) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        LOG.info("snapshot data start");
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
        int length = strArr.length;
        for (int i3 = 0; i3 < length; i3 += USE_POST_LOWWATERMARK_HOOK) {
            makeFirstPartRedoLogEvents("DEBEZIUM." + strArr[i3]);
        }
        if (failoverPhase == FailoverPhase.REDO_LOG) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(200L);
            });
        }
        int length2 = strArr.length;
        for (int i4 = 0; i4 < length2; i4 += USE_POST_LOWWATERMARK_HOOK) {
            makeSecondPartRedoLogEvents("DEBEZIUM." + strArr[i4]);
        }
        String[] strArr3 = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        ArrayList arrayList2 = new ArrayList();
        for (int i5 = 0; i5 < strArr.length; i5 += USE_POST_LOWWATERMARK_HOOK) {
            arrayList2.addAll(Arrays.asList(strArr3));
        }
        assertEqualsInAnyOrder(arrayList2, fetchRows(collect, arrayList2.size()));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private void makeFirstPartRedoLogEvents(String str) throws Exception {
        executeSql("UPDATE " + str + " SET address = 'Hangzhou' where id = 103");
        executeSql("DELETE FROM " + str + " where id = 102");
        executeSql("INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')");
        executeSql("UPDATE " + str + " SET address = 'Shanghai' where id = 103");
    }

    private void makeSecondPartRedoLogEvents(String str) throws Exception {
        executeSql("UPDATE " + str + " SET address = 'Hangzhou' where id = 1010");
        executeSql("INSERT INTO " + str + " VALUES(2001, 'user_22','Shanghai','123567891234')");
        executeSql("INSERT INTO " + str + " VALUES(2002, 'user_23','Shanghai','123567891234')");
        executeSql("INSERT INTO " + str + " VALUES(2003, 'user_24','Shanghai','123567891234')");
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private static List<String> fetchRowData(Iterator<RowData> it, int i, Function<RowData, String> function) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next());
            i--;
        }
        return (List) arrayList.stream().map(function).collect(Collectors.toList());
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    private String getTableNameRegex(String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == USE_POST_LOWWATERMARK_HOOK ? strArr[0] : String.format("(%s)", StringUtils.join(strArr, "|"));
    }

    private void executeSql(String str) throws Exception {
        Connection jdbcConnection = getJdbcConnection();
        Throwable th = null;
        try {
            Statement createStatement = jdbcConnection.createStatement();
            Throwable th2 = null;
            try {
                try {
                    createStatement.execute(str);
                    if (createStatement != null) {
                        if (0 != 0) {
                            try {
                                createStatement.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            createStatement.close();
                        }
                    }
                    if (jdbcConnection != null) {
                        if (0 == 0) {
                            jdbcConnection.close();
                            return;
                        }
                        try {
                            jdbcConnection.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (createStatement != null) {
                    if (th2 != null) {
                        try {
                            createStatement.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        createStatement.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (jdbcConnection != null) {
                if (0 != 0) {
                    try {
                        jdbcConnection.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    jdbcConnection.close();
                }
            }
            throw th8;
        }
    }

    private static void triggerFailover(FailoverType failoverType, JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$cdc$connectors$oracle$source$OracleSourceITCase$FailoverType[failoverType.ordinal()]) {
            case USE_POST_LOWWATERMARK_HOOK /* 1 */:
                restartTaskManager(miniCluster, runnable);
                return;
            case USE_PRE_HIGHWATERMARK_HOOK /* 2 */:
                triggerJobManagerFailover(jobID, miniCluster, runnable);
                return;
            case USE_POST_HIGHWATERMARK_HOOK /* 3 */:
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + failoverType);
        }
    }

    private static void triggerJobManagerFailover(JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    private static void restartTaskManager(MiniCluster miniCluster, Runnable runnable) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1672678148:
                if (implMethodName.equals("lambda$testBackfillWhenWritingEvents$f47821dc$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/cdc/connectors/base/source/utils/hooks/SnapshotPhaseHook") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/flink/cdc/connectors/oracle/source/OracleSourceITCase") && serializedLambda.getImplMethodSignature().equals("([Ljava/lang/String;Lorg/apache/flink/cdc/connectors/base/config/SourceConfig;Lorg/apache/flink/api/connector/source/SourceSplit;)V")) {
                    String[] strArr = (String[]) serializedLambda.getCapturedArg(0);
                    return (sourceConfig, sourceSplit) -> {
                        OracleConnection createOracleConnection = OracleConnectionUtils.createOracleConnection(JdbcConfiguration.copy(((JdbcSourceConfig) sourceConfig).getDbzConnectorConfig().getJdbcConfig()).withUser(OracleSourceTestBase.TEST_USER).withPassword("dbz").build());
                        Throwable th = null;
                        try {
                            try {
                                createOracleConnection.execute(strArr);
                                if (createOracleConnection != null) {
                                    if (0 == 0) {
                                        createOracleConnection.close();
                                        return;
                                    }
                                    try {
                                        createOracleConnection.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                }
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (createOracleConnection != null) {
                                if (th != null) {
                                    try {
                                        createOracleConnection.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    createOracleConnection.close();
                                }
                            }
                            throw th4;
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
