package org.apache.flink.cdc.connectors.mongodb.source;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.base.options.StartupOptions;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHook;
import org.apache.flink.cdc.connectors.base.source.utils.hooks.SnapshotPhaseHooks;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfig;
import org.apache.flink.cdc.connectors.mongodb.source.config.MongoDBSourceConfigFactory;
import org.apache.flink.cdc.connectors.mongodb.source.utils.MongoUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.TestTable;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.UniqueConstraint;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Preconditions;
import org.bson.Document;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase.class */
public class MongoDBFullChangelogITCase extends MongoDBSourceTestBase {
    private static final int USE_POST_LOWWATERMARK_HOOK = 1;
    private static final int USE_PRE_HIGHWATERMARK_HOOK = 2;
    private static final int USE_POST_HIGHWATERMARK_HOOK = 3;

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(300);
    private final boolean parallelismSnapshot;

    public MongoDBFullChangelogITCase(boolean z) {
        this.parallelismSnapshot = z;
    }

    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{false}, new Object[]{true}};
    }

    @Test
    public void testGetMongoDBVersion() {
        Assert.assertEquals(MongoUtils.getMongoVersion(new MongoDBSourceConfigFactory().hosts(CONTAINER.getHostAndPort()).splitSizeMB(USE_POST_LOWWATERMARK_HOOK).samplesPerChunk(10).pollAwaitTimeMillis(500).create(0)), "6.0.9");
    }

    @Test
    public void testReadSingleCollectionWithSingleParallelism() throws Exception {
        testMongoDBParallelSource(USE_POST_LOWWATERMARK_HOOK, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadSingleCollectionWithMultipleParallelism() throws Exception {
        testMongoDBParallelSource(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers"});
    }

    @Test
    public void testReadMultipleCollectionWithSingleParallelism() throws Exception {
        testMongoDBParallelSource(USE_POST_LOWWATERMARK_HOOK, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testReadMultipleCollectionWithMultipleParallelism() throws Exception {
        testMongoDBParallelSource(4, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, new String[]{"customers", "customers_1"});
    }

    @Test
    public void testTaskManagerFailoverInSnapshotPhase() throws Exception {
        if (this.parallelismSnapshot) {
            testMongoDBParallelSource(MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
        }
    }

    @Test
    public void testTaskManagerFailoverInStreamPhase() throws Exception {
        if (this.parallelismSnapshot) {
            testMongoDBParallelSource(MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.STREAM, new String[]{"customers", "customers_1"});
        }
    }

    @Test
    public void testJobManagerFailoverInSnapshotPhase() throws Exception {
        if (this.parallelismSnapshot) {
            testMongoDBParallelSource(MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers", "customers_1"});
        }
    }

    @Test
    public void testJobManagerFailoverInStreamPhase() throws Exception {
        if (this.parallelismSnapshot) {
            testMongoDBParallelSource(MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.STREAM, new String[]{"customers", "customers_1"});
        }
    }

    @Test
    public void testTaskManagerFailoverSingleParallelism() throws Exception {
        if (this.parallelismSnapshot) {
            testMongoDBParallelSource(USE_POST_LOWWATERMARK_HOOK, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers"});
        }
    }

    @Test
    public void testJobManagerFailoverSingleParallelism() throws Exception {
        if (this.parallelismSnapshot) {
            testMongoDBParallelSource(USE_POST_LOWWATERMARK_HOOK, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers"});
        }
    }

    @Test
    public void testReadSingleTableWithSingleParallelismAndSkipBackfill() throws Exception {
        if (this.parallelismSnapshot) {
            testMongoDBParallelSource(4, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, new String[]{"customers"}, true);
        }
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPostHighWaterMark() throws Exception {
        if (this.parallelismSnapshot) {
            MongoDBAssertUtils.assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 22, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.snapshot()));
        }
    }

    @Test
    public void testSnapshotOnlyModeWithDMLPreHighWaterMark() throws Exception {
        if (this.parallelismSnapshot) {
            MongoDBAssertUtils.assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 22, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.snapshot()));
        }
    }

    @Test
    public void testEnableBackfillWithDMLPreHighWaterMark() throws Exception {
        if (this.parallelismSnapshot) {
            MongoDBAssertUtils.assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()));
        }
    }

    @Test
    public void testEnableBackfillWithDMLPostLowWaterMark() throws Exception {
        if (this.parallelismSnapshot) {
            MongoDBAssertUtils.assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 21, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()));
        }
    }

    @Test
    public void testEnableBackfillWithDMLPostHighWaterMark() throws Exception {
        if (this.parallelismSnapshot) {
            MongoDBAssertUtils.assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(false, 25, USE_POST_HIGHWATERMARK_HOOK, StartupOptions.initial()));
        }
    }

    @Test
    public void testSkipBackfillWithDMLPreHighWaterMark() throws Exception {
        if (this.parallelismSnapshot) {
            MongoDBAssertUtils.assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(true, 25, USE_PRE_HIGHWATERMARK_HOOK, StartupOptions.initial()));
        }
    }

    @Test
    public void testSkipBackfillWithDMLPostLowWaterMark() throws Exception {
        if (this.parallelismSnapshot) {
            MongoDBAssertUtils.assertEqualsInAnyOrder(Arrays.asList("+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[2000, user_21, Pittsburgh, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "+I[15213, user_15213, Shanghai, 123567891234]", "-U[2000, user_21, Shanghai, 123567891234]", "+U[2000, user_21, Pittsburgh, 123567891234]", "-D[1019, user_20, Shanghai, 123567891234]"), testBackfillWhenWritingEvents(true, 25, USE_POST_LOWWATERMARK_HOOK, StartupOptions.initial()));
        }
    }

    private List<String> testBackfillWhenWritingEvents(boolean z, int i, int i2, StartupOptions startupOptions) throws Exception {
        String str = "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
        CONTAINER.executeCommand("use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
        CONTAINER.executeCommandInDatabase(String.format("db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", "customers", "customers"), str);
        CONTAINER.executeCommandFileInDatabase("customer", str);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(1000L);
        executionEnvironment.setParallelism(USE_POST_LOWWATERMARK_HOOK);
        TestTable testTable = new TestTable(str, "customers", new ResolvedSchema(Arrays.asList(Column.physical("cid", DataTypes.BIGINT().notNull()), Column.physical("name", DataTypes.STRING()), Column.physical("address", DataTypes.STRING()), Column.physical("phone_number", DataTypes.STRING())), new ArrayList(), UniqueConstraint.primaryKey("pk", Collections.singletonList("cid"))));
        MongoDBSource build = new MongoDBSourceBuilder().hosts(CONTAINER.getHostAndPort()).databaseList(new String[]{str}).username("flinkuser").password("a1?~!@#$%^&*(){}[]<>.,+_-=/|:;").startupOptions(startupOptions).scanFullChangelog(true).collectionList(new String[]{getCollectionNameRegex(str, new String[]{"customers"})}).deserializer(testTable.getDeserializer(true)).skipSnapshotBackfill(z).build();
        SnapshotPhaseHooks snapshotPhaseHooks = new SnapshotPhaseHooks();
        SnapshotPhaseHook snapshotPhaseHook = (sourceConfig, sourceSplit) -> {
            MongoDBSourceConfig mongoDBSourceConfig = (MongoDBSourceConfig) sourceConfig;
            MongoCollection collection = MongoUtils.clientFor(mongoDBSourceConfig).getDatabase((String) mongoDBSourceConfig.getDatabaseList().get(0)).getCollection("customers");
            Document document = new Document();
            document.put("cid", 15213L);
            document.put("name", "user_15213");
            document.put("address", "Shanghai");
            document.put("phone_number", "123567891234");
            collection.insertOne(document);
            collection.updateOne(Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh"));
            collection.deleteOne(Filters.eq("cid", 1019L));
        };
        switch (i2) {
            case USE_POST_LOWWATERMARK_HOOK /* 1 */:
                snapshotPhaseHooks.setPostLowWatermarkAction(snapshotPhaseHook);
                break;
            case USE_PRE_HIGHWATERMARK_HOOK /* 2 */:
                snapshotPhaseHooks.setPreHighWatermarkAction(snapshotPhaseHook);
                break;
            case USE_POST_HIGHWATERMARK_HOOK /* 3 */:
                snapshotPhaseHooks.setPostHighWatermarkAction(snapshotPhaseHook);
                break;
        }
        build.setSnapshotHooks(snapshotPhaseHooks);
        new ArrayList();
        CloseableIterator executeAndCollect = executionEnvironment.fromSource(build, WatermarkStrategy.noWatermarks(), "Backfill Skipped Source").executeAndCollect();
        Throwable th = null;
        try {
            try {
                testTable.getClass();
                List<String> fetchRowData = MongoDBTestUtils.fetchRowData(executeAndCollect, i, testTable::stringify);
                executionEnvironment.close();
                if (executeAndCollect != null) {
                    if (0 != 0) {
                        try {
                            executeAndCollect.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        executeAndCollect.close();
                    }
                }
                return fetchRowData;
            } finally {
            }
        } catch (Throwable th3) {
            if (executeAndCollect != null) {
                if (th != null) {
                    try {
                        executeAndCollect.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    executeAndCollect.close();
                }
            }
            throw th3;
        }
    }

    private void testMongoDBParallelSource(MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testMongoDBParallelSource(4, failoverType, failoverPhase, strArr);
    }

    private void testMongoDBParallelSource(int i, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String[] strArr) throws Exception {
        testMongoDBParallelSource(i, failoverType, failoverPhase, strArr, false);
    }

    private void testMongoDBParallelSource(int i, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String[] strArr, boolean z) throws Exception {
        String str = "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
        CONTAINER.executeCommand("use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
        int length = strArr.length;
        for (int i2 = 0; i2 < length; i2 += USE_POST_LOWWATERMARK_HOOK) {
            String str2 = strArr[i2];
            CONTAINER.executeCommandInDatabase(String.format("db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", str2, str2), str);
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(USE_POST_LOWWATERMARK_HOOK, 0L));
        Object[] objArr = new Object[7];
        objArr[0] = this.parallelismSnapshot ? "true" : "false";
        objArr[USE_POST_LOWWATERMARK_HOOK] = CONTAINER.getHostAndPort();
        objArr[USE_PRE_HIGHWATERMARK_HOOK] = "flinkuser";
        objArr[USE_POST_HIGHWATERMARK_HOOK] = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
        objArr[4] = str;
        objArr[5] = getCollectionNameRegex(str, strArr);
        objArr[6] = Boolean.valueOf(z);
        String format = String.format("CREATE TABLE customers ( _id STRING NOT NULL, cid BIGINT NOT NULL, name STRING, address STRING, phone_number STRING, primary key (_id) not enforced) WITH ( 'connector' = 'mongodb-cdc', 'scan.incremental.snapshot.enabled' = '%s', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'heartbeat.interval.ms' = '500', 'scan.full-changelog' = 'true', 'scan.incremental.snapshot.backfill.skip' = '%s')", objArr);
        CONTAINER.executeCommandFileInDatabase("customer", str);
        String[] strArr2 = {"+I[101, user_1, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "+I[103, user_3, Shanghai, 123567891234]", "+I[109, user_4, Shanghai, 123567891234]", "+I[110, user_5, Shanghai, 123567891234]", "+I[111, user_6, Shanghai, 123567891234]", "+I[118, user_7, Shanghai, 123567891234]", "+I[121, user_8, Shanghai, 123567891234]", "+I[123, user_9, Shanghai, 123567891234]", "+I[1009, user_10, Shanghai, 123567891234]", "+I[1010, user_11, Shanghai, 123567891234]", "+I[1011, user_12, Shanghai, 123567891234]", "+I[1012, user_13, Shanghai, 123567891234]", "+I[1013, user_14, Shanghai, 123567891234]", "+I[1014, user_15, Shanghai, 123567891234]", "+I[1015, user_16, Shanghai, 123567891234]", "+I[1016, user_17, Shanghai, 123567891234]", "+I[1017, user_18, Shanghai, 123567891234]", "+I[1018, user_19, Shanghai, 123567891234]", "+I[1019, user_20, Shanghai, 123567891234]", "+I[2000, user_21, Shanghai, 123567891234]"};
        create.executeSql(format);
        TableResult executeSql = create.executeSql("select cid, name, address, phone_number from customers");
        CloseableIterator collect = executeSql.collect();
        JobID jobID = ((JobClient) executeSql.getJobClient().get()).getJobID();
        ArrayList arrayList = new ArrayList();
        for (int i3 = 0; i3 < strArr.length; i3 += USE_POST_LOWWATERMARK_HOOK) {
            arrayList.addAll(Arrays.asList(strArr2));
        }
        if (failoverPhase == MongoDBTestUtils.FailoverPhase.SNAPSHOT && collect.hasNext()) {
            MongoDBTestUtils.triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, MongoDBTestUtils.fetchRows(collect, arrayList.size()));
        int length2 = strArr.length;
        for (int i4 = 0; i4 < length2; i4 += USE_POST_LOWWATERMARK_HOOK) {
            makeFirstPartChangeStreamEvents(mongodbClient.getDatabase(str), strArr[i4]);
        }
        if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM) {
            MongoDBTestUtils.triggerFailover(failoverType, jobID, this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(200L);
            });
        }
        int length3 = strArr.length;
        for (int i5 = 0; i5 < length3; i5 += USE_POST_LOWWATERMARK_HOOK) {
            makeSecondPartChangeStreamEvents(mongodbClient.getDatabase(str), strArr[i5]);
        }
        String[] strArr3 = {"-U[101, user_1, Shanghai, 123567891234]", "+U[101, user_1, Hangzhou, 123567891234]", "-D[102, user_2, Shanghai, 123567891234]", "+I[102, user_2, Shanghai, 123567891234]", "-U[103, user_3, Shanghai, 123567891234]", "+U[103, user_3, Hangzhou, 123567891234]", "-U[1010, user_11, Shanghai, 123567891234]", "+U[1010, user_11, Hangzhou, 123567891234]", "+I[2001, user_22, Shanghai, 123567891234]", "+I[2002, user_23, Shanghai, 123567891234]", "+I[2003, user_24, Shanghai, 123567891234]"};
        ArrayList arrayList2 = new ArrayList();
        for (int i6 = 0; i6 < strArr.length; i6 += USE_POST_LOWWATERMARK_HOOK) {
            arrayList2.addAll(Arrays.asList(strArr3));
        }
        MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList2, MongoDBTestUtils.fetchRows(collect, arrayList2.size()));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private String getCollectionNameRegex(String str, String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == USE_POST_LOWWATERMARK_HOOK ? str + "." + strArr[0] : (String) Arrays.stream(strArr).map(str2 -> {
            return "^(" + str + "." + str2 + ")$";
        }).collect(Collectors.joining("|"));
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private void makeFirstPartChangeStreamEvents(MongoDatabase mongoDatabase, String str) {
        MongoCollection collection = mongoDatabase.getCollection(str);
        collection.updateOne(Filters.eq("cid", 101L), Updates.set("address", "Hangzhou"));
        collection.deleteOne(Filters.eq("cid", 102L));
        collection.insertOne(customerDocOf(102L, "user_2", "Shanghai", "123567891234"));
        collection.updateOne(Filters.eq("cid", 103L), Updates.set("address", "Hangzhou"));
    }

    private void makeSecondPartChangeStreamEvents(MongoDatabase mongoDatabase, String str) {
        MongoCollection collection = mongoDatabase.getCollection(str);
        collection.updateOne(Filters.eq("cid", 1010L), Updates.set("address", "Hangzhou"));
        collection.insertMany(Arrays.asList(customerDocOf(2001L, "user_22", "Shanghai", "123567891234"), customerDocOf(2002L, "user_23", "Shanghai", "123567891234"), customerDocOf(2003L, "user_24", "Shanghai", "123567891234")));
    }

    private Document customerDocOf(Long l, String str, String str2, String str3) {
        Document document = new Document();
        document.put("cid", l);
        document.put("name", str);
        document.put("address", str2);
        document.put("phone_number", str3);
        return document;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2070691530:
                if (implMethodName.equals("lambda$testBackfillWhenWritingEvents$5bc7e2d1$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/cdc/connectors/base/source/utils/hooks/SnapshotPhaseHook") && serializedLambda.getFunctionalInterfaceMethodName().equals("accept") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/flink/cdc/connectors/mongodb/source/MongoDBFullChangelogITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/cdc/connectors/base/config/SourceConfig;Lorg/apache/flink/api/connector/source/SourceSplit;)V")) {
                    return (sourceConfig, sourceSplit) -> {
                        MongoDBSourceConfig mongoDBSourceConfig = (MongoDBSourceConfig) sourceConfig;
                        MongoCollection collection = MongoUtils.clientFor(mongoDBSourceConfig).getDatabase((String) mongoDBSourceConfig.getDatabaseList().get(0)).getCollection("customers");
                        Document document = new Document();
                        document.put("cid", 15213L);
                        document.put("name", "user_15213");
                        document.put("address", "Shanghai");
                        document.put("phone_number", "123567891234");
                        collection.insertOne(document);
                        collection.updateOne(Filters.eq("cid", 2000L), Updates.set("address", "Pittsburgh"));
                        collection.deleteOne(Filters.eq("cid", 1019L));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
