package org.apache.flink.cdc.connectors.mongodb.source;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBAssertUtils;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.bson.Document;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;

/* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/source/NewlyAddedTableITCase.class */
public class NewlyAddedTableITCase extends MongoDBSourceTestBase {
    private String customerDatabase;
    protected static final int DEFAULT_PARALLELISM = 4;

    @Rule
    public final Timeout timeoutPerTest = Timeout.seconds(500);
    private final ScheduledExecutorService mockChangelogExecutor = Executors.newScheduledThreadPool(1);

    @Before
    public void before() throws SQLException {
        this.customerDatabase = "customer_" + Integer.toUnsignedString(new Random().nextInt(), 36);
        TestValuesTableFactory.clearAllData();
        CONTAINER.executeCommand("use admin; db.runCommand({ setClusterParameter: { changeStreamOptions: { preAndPostImages: { expireAfterSeconds: 'off' } } } })");
        MongoCollection collection = mongodbClient.getDatabase(this.customerDatabase).getCollection("produce_changelog");
        this.mockChangelogExecutor.schedule(() -> {
            Document document = new Document();
            document.put("cid", 1);
            document.put("cnt", 103L);
            collection.insertOne(document);
            collection.deleteOne(Filters.eq("cid", 1));
        }, 500L, TimeUnit.MICROSECONDS);
    }

    @After
    public void after() {
        this.mockChangelogExecutor.shutdown();
        mongodbClient.getDatabase(this.customerDatabase).drop();
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineOnce() throws Exception {
        testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineOnceWithAheadOplog() throws Exception {
        testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineTwice() throws Exception {
        testNewlyAddedCollectionOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineTwiceWithAheadOplog() throws Exception {
        testNewlyAddedCollectionOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineTwiceWithAheadOplogAndAutoCloseReader() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("scan.incremental.close-idle-reader.enabled", "true");
        testNewlyAddedCollectionOneByOne(DEFAULT_PARALLELISM, hashMap, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineThrice() throws Exception {
        testNewlyAddedCollectionOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineThriceWithAheadOplog() throws Exception {
        testNewlyAddedCollectionOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineSingleParallelism() throws Exception {
        testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testNewlyAddedCollectionForExistsPipelineSingleParallelismWithAheadOplog() throws Exception {
        testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedCollection() throws Exception {
        testNewlyAddedCollectionOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForNewlyAddedCollectionWithAheadOplog() throws Exception {
        testNewlyAddedCollectionOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, true, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedCollection() throws Exception {
        testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.STREAM, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testTaskManagerFailoverForNewlyAddedCollectionWithAheadOplog() throws Exception {
        testNewlyAddedCollectionOneByOne(1, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.STREAM, false, "address_hangzhou", "address_beijing");
    }

    @Test
    public void testJobManagerFailoverForRemoveCollectionSingleParallelism() throws Exception {
        testRemoveCollectionsOneByOne(1, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testJobManagerFailoverForRemoveCollection() throws Exception {
        testRemoveCollectionsOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.JM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveCollectionSingleParallelism() throws Exception {
        testRemoveCollectionsOneByOne(1, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testTaskManagerFailoverForRemoveCollection() throws Exception {
        testRemoveCollectionsOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.TM, MongoDBTestUtils.FailoverPhase.SNAPSHOT, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveCollectionSingleParallelism() throws Exception {
        testRemoveCollectionsOneByOne(1, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveCollection() throws Exception {
        testRemoveCollectionsOneByOne(DEFAULT_PARALLELISM, MongoDBTestUtils.FailoverType.NONE, MongoDBTestUtils.FailoverPhase.NEVER, "address_hangzhou", "address_beijing", "address_shanghai");
    }

    @Test
    public void testRemoveAndAddCollectionsOneByOne() throws Exception {
        testRemoveAndAddCollectionsOneByOne(1, "address_hangzhou", "address_beijing", "address_shanghai", "address_shenzhen");
    }

    private void testRemoveAndAddCollectionsOneByOne(int i, String... strArr) throws Exception {
        MongoDatabase database = mongodbClient.getDatabase(this.customerDatabase);
        initialAddressCollections(database, strArr);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        ArrayList arrayList = new ArrayList();
        StreamTableEnvironment create = StreamTableEnvironment.create(getStreamExecutionEnvironmentFromSavePoint(null, i));
        String str = strArr[0];
        String str2 = str.split("_")[1];
        create.executeSql(getCreateTableStatement(new HashMap(), str));
        create.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        JobClient jobClient = (JobClient) create.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address").getJobClient().get();
        arrayList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str, str2, str2), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str, str2, str2), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str, str2, str2)));
        MongoDBTestUtils.waitForSinkSize("sink", arrayList.size());
        MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
        makeOplogForAddressTableInRound(database, str, 0);
        arrayList.addAll(Arrays.asList(String.format("-U[%s, 416874195632735147, China, %s, %s West Town address 1]", str, str2, str2), String.format("+U[%s, 416874195632735147, China_0, %s, %s West Town address 1]", str, str2, str2), String.format("+I[%s, %d, China, %s, %s West Town address 4]", str, 417022095255614380L, str2, str2)));
        MongoDBTestUtils.waitForSinkSize("sink", arrayList.size());
        MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
        String triggerSavepointWithRetry = triggerSavepointWithRetry(jobClient, uri);
        jobClient.cancel().get();
        for (int i2 = 1; i2 < strArr.length; i2++) {
            String str3 = strArr[i2];
            String str4 = str3.split("_")[1];
            StreamTableEnvironment create2 = StreamTableEnvironment.create(getStreamExecutionEnvironmentFromSavePoint(triggerSavepointWithRetry, i));
            create2.executeSql(getCreateTableStatement(new HashMap(), strArr[0], str3));
            create2.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            JobClient jobClient2 = (JobClient) create2.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address").getJobClient().get();
            arrayList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str3, str4, str4), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str3, str4, str4), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str3, str4, str4)));
            MongoDBTestUtils.waitForSinkSize("sink", arrayList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            for (int i3 = 0; i3 <= i2; i3++) {
                makeOplogForAddressTableInRound(database, strArr[i3], i2);
            }
            String str5 = strArr[0];
            String str6 = str5.split("_")[1];
            arrayList.addAll(Arrays.asList(String.format("-U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", str5, Integer.valueOf(i2 - 1), str6, str6), String.format("+U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", str5, Integer.valueOf(i2), str6, str6), String.format("+I[%s, %d, China, %s, %s West Town address 4]", str5, Long.valueOf(417022095255614380L + i2), str6, str6)));
            arrayList.addAll(Arrays.asList(String.format("-U[%s, 416874195632735147, China, %s, %s West Town address 1]", str3, str4, str4), String.format("+U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", str3, Integer.valueOf(i2), str4, str4), String.format("+I[%s, %d, China, %s, %s West Town address 4]", str3, Long.valueOf(417022095255614380L + i2), str4, str4)));
            MongoDBTestUtils.waitForSinkSize("sink", arrayList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            if (i2 != strArr.length - 1) {
                triggerSavepointWithRetry = triggerSavepointWithRetry(jobClient2, uri);
            }
            jobClient2.cancel().get();
        }
    }

    private void testRemoveCollectionsOneByOne(int i, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, String... strArr) throws Exception {
        initialAddressCollections(mongodbClient.getDatabase(this.customerDatabase), strArr);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        ArrayList arrayList = new ArrayList();
        for (String str : strArr) {
            String str2 = str.split("_")[1];
            arrayList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str, str2, str2), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str, str2, str2), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str, str2, str2)));
        }
        StreamTableEnvironment create = StreamTableEnvironment.create(getStreamExecutionEnvironmentFromSavePoint(null, i));
        create.executeSql(getCreateTableStatement(new HashMap(), strArr));
        create.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        JobClient jobClient = (JobClient) create.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address").getJobClient().get();
        if (failoverPhase == MongoDBTestUtils.FailoverPhase.SNAPSHOT) {
            MongoDBTestUtils.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                sleepMs(100L);
            });
        }
        MongoDBTestUtils.waitForSinkSize("sink", arrayList.size());
        MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
        String triggerSavepointWithRetry = triggerSavepointWithRetry(jobClient, uri);
        jobClient.cancel().get();
        int i2 = 0;
        while (i2 < strArr.length - 1) {
            String[] strArr2 = (String[]) Arrays.asList(strArr).subList(i2 + 1, strArr.length).toArray(new String[0]);
            StreamTableEnvironment create2 = StreamTableEnvironment.create(getStreamExecutionEnvironmentFromSavePoint(triggerSavepointWithRetry, i));
            create2.executeSql(getCreateTableStatement(new HashMap(), strArr2));
            create2.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            JobClient jobClient2 = (JobClient) create2.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address").getJobClient().get();
            MongoDBTestUtils.waitForSinkSize("sink", arrayList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            ArrayList arrayList2 = new ArrayList();
            int length = strArr.length;
            for (int i3 = 0; i3 < length; i3++) {
                String str3 = strArr[i3];
                makeOplogForAddressTableInRound(mongodbClient.getDatabase(this.customerDatabase), str3, i2);
                if (i3 > i2) {
                    String str4 = str3.split("_")[1];
                    String[] strArr3 = new String[3];
                    Object[] objArr = new Object[DEFAULT_PARALLELISM];
                    objArr[0] = str3;
                    objArr[1] = i2 == 0 ? "" : "_" + (i2 - 1);
                    objArr[2] = str4;
                    objArr[3] = str4;
                    strArr3[0] = String.format("-U[%s, 416874195632735147, China%s, %s, %s West Town address 1]", objArr);
                    strArr3[1] = String.format("+U[%s, 416874195632735147, China_%s, %s, %s West Town address 1]", str3, Integer.valueOf(i2), str4, str4);
                    strArr3[2] = String.format("+I[%s, %d, China, %s, %s West Town address 4]", str3, Long.valueOf(417022095255614380L + i2), str4, str4);
                    arrayList2.addAll(Arrays.asList(strArr3));
                }
            }
            if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM && TestValuesTableFactory.getRawResults("sink").size() > arrayList.size()) {
                MongoDBTestUtils.triggerFailover(failoverType, jobClient2.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            arrayList.addAll(arrayList2);
            MongoDBTestUtils.waitForSinkSize("sink", arrayList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getRawResults("sink"));
            triggerSavepointWithRetry = triggerSavepointWithRetry(jobClient2, uri);
            jobClient2.cancel().get();
            i2++;
        }
    }

    private void testNewlyAddedCollectionOneByOne(int i, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, boolean z, String... strArr) throws Exception {
        testNewlyAddedCollectionOneByOne(i, new HashMap(), failoverType, failoverPhase, z, strArr);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v54, types: [java.util.List] */
    private void testNewlyAddedCollectionOneByOne(int i, Map<String, String> map, MongoDBTestUtils.FailoverType failoverType, MongoDBTestUtils.FailoverPhase failoverPhase, boolean z, String... strArr) throws Exception {
        initialAddressCollections(mongodbClient.getDatabase(this.customerDatabase), strArr);
        TemporaryFolder temporaryFolder = new TemporaryFolder();
        temporaryFolder.create();
        String uri = temporaryFolder.newFolder().toURI().toString();
        String str = null;
        ArrayList arrayList = new ArrayList();
        for (int i2 = 0; i2 < strArr.length; i2++) {
            String[] strArr2 = (String[]) Arrays.asList(strArr).subList(0, i2 + 1).toArray(new String[0]);
            String str2 = strArr[i2];
            if (z) {
                makeOplogBeforeCaptureForAddressCollection(mongodbClient.getDatabase(this.customerDatabase), str2);
            }
            StreamTableEnvironment create = StreamTableEnvironment.create(getStreamExecutionEnvironmentFromSavePoint(str, i));
            create.executeSql(getCreateTableStatement(map, strArr2));
            create.executeSql("CREATE TABLE sink ( collection_name STRING, cid BIGINT, country STRING, city STRING, detail_address STRING, primary key (collection_name,cid) not enforced) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
            JobClient jobClient = (JobClient) create.executeSql("insert into sink select collection_name, cid, country, city, detail_address from address").getJobClient().get();
            String str3 = str2.split("_")[1];
            List asList = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str2, str3, str3), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str2, str3, str3));
            if (z) {
                asList = Arrays.asList(String.format("+I[%s, 416874195632735147, China, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 416927583791428523, China, %s, %s West Town address 2]", str2, str3, str3), String.format("+I[%s, 417022095255614379, China, %s, %s West Town address 3]", str2, str3, str3), String.format("+I[%s, 417022095255614381, China, %s, %s West Town address 5]", str2, str3, str3));
            }
            if (failoverPhase == MongoDBTestUtils.FailoverPhase.SNAPSHOT) {
                MongoDBTestUtils.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            arrayList.addAll(asList);
            waitForUpsertSinkSize("sink", arrayList.size());
            MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getResults("sink"));
            makeFirstPartOplogForAddressCollection(mongodbClient.getDatabase(this.customerDatabase), str2);
            if (failoverPhase == MongoDBTestUtils.FailoverPhase.STREAM) {
                MongoDBTestUtils.triggerFailover(failoverType, jobClient.getJobID(), this.miniClusterResource.getMiniCluster(), () -> {
                    sleepMs(100L);
                });
            }
            makeSecondPartOplogForAddressCollections(mongodbClient.getDatabase(this.customerDatabase), str2);
            arrayList = (List) arrayList.stream().filter(str4 -> {
                return !str4.contains(String.format("%s, 416874195632735147", str2));
            }).collect(Collectors.toList());
            arrayList.addAll(Arrays.asList(String.format("+I[%s, 416874195632735147, CHINA, %s, %s West Town address 1]", str2, str3, str3), String.format("+I[%s, 417022095255614380, China, %s, %s West Town address 4]", str2, str3, str3)));
            waitForUpsertSinkSize("sink", arrayList.size());
            Thread.sleep(1000L);
            MongoDBAssertUtils.assertEqualsInAnyOrder(arrayList, TestValuesTableFactory.getResults("sink"));
            if (i2 != strArr.length - 1) {
                str = triggerSavepointWithRetry(jobClient, uri);
            }
            jobClient.cancel().get();
        }
    }

    private void initialAddressCollections(MongoDatabase mongoDatabase, String[] strArr) {
        for (String str : strArr) {
            String str2 = str.split("_")[1];
            CONTAINER.executeCommandInDatabase(String.format("db.createCollection('%s'); db.runCommand({ collMod: '%s', changeStreamPreAndPostImages: { enabled: true } })", str, str), mongoDatabase.getName());
            MongoCollection collection = mongoDatabase.getCollection(str);
            collection.insertOne(addressDocOf(416874195632735147L, "China", str2, str2 + " West Town address 1"));
            collection.insertOne(addressDocOf(416927583791428523L, "China", str2, str2 + " West Town address 2"));
            collection.insertOne(addressDocOf(417022095255614379L, "China", str2, str2 + " West Town address 3"));
        }
    }

    private void makeFirstPartOplogForAddressCollection(MongoDatabase mongoDatabase, String str) {
        mongoDatabase.getCollection(str).updateOne(Filters.eq("cid", 416874195632735147L), Updates.set("country", "CHINA"));
    }

    private void makeSecondPartOplogForAddressCollections(MongoDatabase mongoDatabase, String str) {
        String str2 = str.split("_")[1];
        mongoDatabase.getCollection(str).insertOne(addressDocOf(417022095255614380L, "China", str2, str2 + " West Town address 4"));
    }

    private void makeOplogBeforeCaptureForAddressCollection(MongoDatabase mongoDatabase, String str) {
        String str2 = str.split("_")[1];
        mongoDatabase.getCollection(str).insertOne(addressDocOf(417022095255614381L, "China", str2, str2 + " West Town address 5"));
    }

    private void makeOplogForAddressTableInRound(MongoDatabase mongoDatabase, String str, int i) {
        MongoCollection collection = mongoDatabase.getCollection(str);
        String str2 = str.split("_")[1];
        collection.updateOne(Filters.eq("cid", 416874195632735147L), Updates.set("country", "China_" + i));
        collection.insertOne(addressDocOf(Long.valueOf(417022095255614380L + i), "China", str2, str2 + " West Town address 4"));
    }

    private void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
        }
    }

    private String triggerSavepointWithRetry(JobClient jobClient, String str) throws ExecutionException, InterruptedException {
        for (int i = 0; i < 600; i++) {
            try {
                return (String) jobClient.triggerSavepoint(str).get();
            } catch (Exception e) {
                Optional findThrowable = ExceptionUtils.findThrowable(e, CheckpointException.class);
                if (!findThrowable.isPresent() || !((CheckpointException) findThrowable.get()).getMessage().contains("Checkpoint triggering task")) {
                    throw e;
                }
                Thread.sleep(100L);
            }
        }
        return null;
    }

    private StreamExecutionEnvironment getStreamExecutionEnvironmentFromSavePoint(String str, int i) throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString("table.exec.sink.upsert-materialize", "none");
        if (str != null) {
            configuration.setString(SavepointConfigOptions.SAVEPOINT_PATH, str);
        }
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(configuration);
        executionEnvironment.setParallelism(i);
        executionEnvironment.enableCheckpointing(200L);
        executionEnvironment.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 100L));
        return executionEnvironment;
    }

    private String getCreateTableStatement(Map<String, String> map, String... strArr) {
        Object[] objArr = new Object[6];
        objArr[0] = CONTAINER.getHostAndPort();
        objArr[1] = "flinkuser";
        objArr[2] = "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;";
        objArr[3] = this.customerDatabase;
        objArr[DEFAULT_PARALLELISM] = getCollectionNameRegex(this.customerDatabase, strArr);
        objArr[5] = map.isEmpty() ? "" : "," + ((String) map.entrySet().stream().map(entry -> {
            return String.format("'%s'='%s'", entry.getKey(), entry.getValue());
        }).collect(Collectors.joining(",")));
        return String.format("CREATE TABLE address ( _id STRING NOT NULL, collection_name STRING METADATA VIRTUAL, cid BIGINT NOT NULL, country STRING, city STRING, detail_address STRING, primary key (_id) not enforced) WITH ( 'connector' = 'mongodb-cdc', 'scan.incremental.snapshot.enabled' = 'true', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'chunk-meta.group.size' = '2', 'heartbeat.interval.ms' = '100', 'scan.full-changelog' = 'true', 'scan.newly-added-table.enabled' = 'true' %s)", objArr);
    }

    protected static void waitForUpsertSinkSize(String str, int i) throws InterruptedException {
        while (upsertSinkSize(str) < i) {
            Thread.sleep(100L);
        }
    }

    protected static int upsertSinkSize(String str) {
        int size;
        synchronized (TestValuesTableFactory.class) {
            try {
                size = TestValuesTableFactory.getResults(str).size();
            } catch (IllegalArgumentException e) {
                return 0;
            }
        }
        return size;
    }

    private String getCollectionNameRegex(String str, String[] strArr) {
        Preconditions.checkState(strArr.length > 0);
        return strArr.length == 1 ? strArr[0] : (String) Arrays.stream(strArr).map(str2 -> {
            return "^(" + str + "." + str2 + ")$";
        }).collect(Collectors.joining("|"));
    }

    private Document addressDocOf(Long l, String str, String str2, String str3) {
        Document document = new Document();
        document.put("cid", l);
        document.put("country", str);
        document.put("city", str2);
        document.put("detail_address", str3);
        return document;
    }
}
