package org.apache.flink.cdc.connectors.db2.source;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.db2.Db2TestBase;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.highavailability.nonha.embedded.HaLeadershipControl;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase.class */
public class Db2SourceITCase extends Db2TestBase {

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);

    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).setNumberSlotsPerTaskManager(DEFAULT_PARALLELISM).setRpcServiceSharing(RpcServiceSharing.DEDICATED).withHaLeadershipControl().build());
    protected static final int DEFAULT_PARALLELISM = 4;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase$FailoverPhase.class */
    public enum FailoverPhase {
        SNAPSHOT,
        STREAM,
        NEVER
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/flink/cdc/connectors/db2/source/Db2SourceITCase$FailoverType.class */
    public enum FailoverType {
        TM,
        JM,
        NONE
    }

    @Test
    public void testReadSingleTableWithSingleParallelism() throws Exception {
        testDb2ParallelSource(1, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testReadSingleTableWithMultipleParallelism() throws Exception {
        testDb2ParallelSource(DEFAULT_PARALLELISM, FailoverType.NONE, FailoverPhase.NEVER, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        testDb2ParallelSource(FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testTaskManagerFailoverInRedoLogsPhase() throws Exception {
        testDb2ParallelSource(FailoverType.TM, FailoverPhase.STREAM, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        testDb2ParallelSource(FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverInRedoLogsPhase() throws Exception {
        testDb2ParallelSource(FailoverType.JM, FailoverPhase.STREAM, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        testDb2ParallelSource(1, FailoverType.JM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"});
    }

    @Test
    public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
        testDb2ParallelSource(DEFAULT_PARALLELISM, FailoverType.TM, FailoverPhase.SNAPSHOT, new String[]{"DB2INST1.CUSTOMERS"}, true);
    }

    private void testDb2ParallelSource(FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testDb2ParallelSource(DEFAULT_PARALLELISM, failoverType, failoverPhase, strArr);
    }

    private void testDb2ParallelSource(int i, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testDb2ParallelSource(i, failoverType, failoverPhase, strArr, false);
    }

    private void testDb2ParallelSource(int i, FailoverType failoverType, FailoverPhase failoverPhase, String[] strArr, boolean z) throws Exception {
        initializeDb2Table("customers", "CUSTOMERS");
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, 0L));
        String[] strArr2 = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        create.executeSql(String.format("CREATE TABLE CUSTOMERS ( ID INT NOT NULL, NAME STRING, ADDRESS STRING, PHONE_NUMBER STRING, primary key (ID) not enforced) WITH ( 'connector' = 'db2-cdc', 'hostname' = '%s', 'port' = '%s', 'username' = '%s', 'password' = '%s', 'database-name' = '%s', 'table-name' = '%s', 'scan.incremental.snapshot.enabled' = 'true', 'scan.incremental.snapshot.chunk.size' = '4', 'scan.incremental.snapshot.backfill.skip' = '%s')", DB2_CONTAINER.getHost(), DB2_CONTAINER.getMappedPort(50000), DB2_CONTAINER.getUsername(), DB2_CONTAINER.getPassword(), DB2_CONTAINER.getDatabaseName(), getTableNameRegex(strArr), Boolean.valueOf(z)));
        TableResult executeSql = create.executeSql("select * from CUSTOMERS");
        CloseableIterator collect = executeSql.collect();
        JobID jobID = ((JobClient) executeSql.getJobClient().get()).getJobID();
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            arrayList.addAll(Arrays.asList(strArr2));
        }
        if (failoverPhase == FailoverPhase.SNAPSHOT && collect.hasNext()) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        assertEqualsInAnyOrder(arrayList, fetchRows(collect, arrayList.size()));
        for (String str : strArr) {
            makeFirstPartChangeStreamEvents(str);
        }
        if (failoverPhase == FailoverPhase.STREAM) {
            triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(200L);
            });
        }
        for (String str2 : strArr) {
            makeSecondPartRedoLogsEvents(str2);
        }
        String[] strArr3 = {"-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Hangzhou, 123567891234]", "+U[103, user_3, Shanghai, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        ArrayList arrayList2 = new ArrayList();
        for (int i3 = 0; i3 < strArr.length; i3++) {
            arrayList2.addAll(Arrays.asList(strArr3));
        }
        assertEqualsInAnyOrder(arrayList2, fetchRows(collect, arrayList2.size()));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private void makeFirstPartChangeStreamEvents(String str) {
        executeSql("UPDATE " + str + " SET ADDRESS = 'Hangzhou' where ID = 103");
        executeSql("DELETE FROM " + str + " where ID = 102");
        executeSql("INSERT INTO " + str + " VALUES(102, 'user_2','Shanghai','123567891234')");
        executeSql("UPDATE " + str + " SET ADDRESS = 'Shanghai' where ID = 103");
    }

    private void makeSecondPartRedoLogsEvents(String str) {
        executeSql("UPDATE " + str + " SET ADDRESS = 'Hangzhou' where ID = 1010");
        executeSql("INSERT INTO " + str + " VALUES(2001, 'user_22','Shanghai','123567891234')");
        executeSql("INSERT INTO " + str + " VALUES(2002, 'user_23','Shanghai','123567891234')");
        executeSql("INSERT INTO " + str + " VALUES(2003, 'user_24','Shanghai','123567891234')");
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private static List<String> fetchRows(Iterator<Row> it, int i) {
        ArrayList arrayList = new ArrayList(i);
        while (i > 0 && it.hasNext()) {
            arrayList.add(it.next().toString());
            i--;
        }
        return arrayList;
    }

    protected static void triggerFailover(FailoverType failoverType, JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        switch (failoverType) {
            case TM:
                restartTaskManager(miniCluster, runnable);
                return;
            case JM:
                triggerJobManagerFailover(jobID, miniCluster, runnable);
                return;
            case NONE:
                return;
            default:
                throw new IllegalStateException("Unexpected value: " + failoverType);
        }
    }

    protected static void triggerJobManagerFailover(JobID jobID, MiniCluster miniCluster, Runnable runnable) throws Exception {
        HaLeadershipControl haLeadershipControl = (HaLeadershipControl) miniCluster.getHaLeadershipControl().get();
        haLeadershipControl.revokeJobMasterLeadership(jobID).get();
        runnable.run();
        haLeadershipControl.grantJobMasterLeadership(jobID).get();
    }

    protected static void restartTaskManager(MiniCluster miniCluster, Runnable runnable) throws Exception {
        miniCluster.terminateTaskManager(0).get();
        runnable.run();
        miniCluster.startTaskManager();
    }
}
