package org.apache.flink.cdc.connectors.mongodb.table;

import com.mongodb.client.MongoDatabase;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.bson.Document;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/table/MongoDBRegexFilterITCase.class */
public class MongoDBRegexFilterITCase extends MongoDBSourceTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());
    private final boolean parallelismSnapshot;

    public MongoDBRegexFilterITCase(boolean z) {
        this.parallelismSnapshot = z;
    }

    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{false}, new Object[]{true}};
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        if (!this.parallelismSnapshot) {
            this.env.setParallelism(1);
        } else {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        }
    }

    @Test
    public void testMatchMultipleDatabasesAndCollections() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String executeCommandFileInSeparateDatabase2 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        TableResult submitTestCase = submitTestCase(null, String.format("^(%s|%s)\\.coll_a\\d?$", executeCommandFileInSeparateDatabase, executeCommandFileInSeparateDatabase2));
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 4);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase2);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 8);
        Assert.assertThat(TestValuesTableFactory.getResults("mongodb_sink"), Matchers.containsInAnyOrder(new String[]{String.format("+I[%s, coll_a1, A101]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a2, A201]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a1, A101]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_a2, A201]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_a1, A102]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a2, A202]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a1, A102]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_a2, A202]", executeCommandFileInSeparateDatabase2)}));
        ((JobClient) submitTestCase.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchMultipleDatabases() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String executeCommandFileInSeparateDatabase2 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String executeCommandFileInSeparateDatabase3 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        TableResult submitTestCase = submitTestCase(String.format("%s|%s", executeCommandFileInSeparateDatabase, executeCommandFileInSeparateDatabase2), null);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 8);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase2);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase3);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 16);
        Assert.assertThat(TestValuesTableFactory.getResults("mongodb_sink"), Matchers.containsInAnyOrder(new String[]{String.format("+I[%s, coll_a1, A101]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a2, A201]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b1, B101]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b2, B201]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a1, A101]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_a2, A201]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_b1, B101]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_b2, B201]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_a1, A102]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a2, A202]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b1, B102]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b2, B202]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_a1, A102]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_a2, A202]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_b1, B102]", executeCommandFileInSeparateDatabase2), String.format("+I[%s, coll_b2, B202]", executeCommandFileInSeparateDatabase2)}));
        ((JobClient) submitTestCase.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchSingleQualifiedCollectionPattern() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String executeCommandFileInSeparateDatabase2 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        TableResult submitTestCase = submitTestCase(null, String.format("^%s\\.coll_b\\d?$", executeCommandFileInSeparateDatabase));
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 2);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase2);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 4);
        Assert.assertThat(TestValuesTableFactory.getResults("mongodb_sink"), Matchers.containsInAnyOrder(new String[]{String.format("+I[%s, coll_b1, B101]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b2, B201]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b1, B102]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b2, B202]", executeCommandFileInSeparateDatabase)}));
        ((JobClient) submitTestCase.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchSingleDatabaseWithCollectionPattern() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        String executeCommandFileInSeparateDatabase2 = CONTAINER.executeCommandFileInSeparateDatabase("ns_regex");
        TableResult submitTestCase = submitTestCase(executeCommandFileInSeparateDatabase, ".*coll_b\\d?");
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 2);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase);
        insertRecordsInDatabase(executeCommandFileInSeparateDatabase2);
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 4);
        Assert.assertThat(TestValuesTableFactory.getResults("mongodb_sink"), Matchers.containsInAnyOrder(new String[]{String.format("+I[%s, coll_b1, B101]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b2, B201]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b1, B102]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll_b2, B202]", executeCommandFileInSeparateDatabase)}));
        ((JobClient) submitTestCase.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchDatabaseAndCollectionContainsDash() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("ns-regex");
        TableResult submitTestCase = submitTestCase(executeCommandFileInSeparateDatabase, "coll-a1");
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 1);
        Assert.assertThat(TestValuesTableFactory.getResults("mongodb_sink"), Matchers.containsInAnyOrder(new String[]{String.format("+I[%s, coll-a1, A101]", executeCommandFileInSeparateDatabase)}));
        ((JobClient) submitTestCase.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMatchCollectionWithDots() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("ns-dotted");
        TableResult submitTestCase = submitTestCase(executeCommandFileInSeparateDatabase, executeCommandFileInSeparateDatabase + "[.]coll[.]name");
        MongoDBTestUtils.waitForSinkSize("mongodb_sink", 3);
        Assert.assertThat(TestValuesTableFactory.getResults("mongodb_sink"), Matchers.containsInAnyOrder(new String[]{String.format("+I[%s, coll.name, A101]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll.name, A102]", executeCommandFileInSeparateDatabase), String.format("+I[%s, coll.name, A103]", executeCommandFileInSeparateDatabase)}));
        ((JobClient) submitTestCase.getJobClient().get()).cancel().get();
    }

    private TableResult submitTestCase(String str, String str2) throws Exception {
        this.tEnv.executeSql("CREATE TABLE mongodb_source ( _id STRING NOT NULL, seq STRING, db_name STRING METADATA FROM 'database_name' VIRTUAL, coll_name STRING METADATA FROM 'collection_name' VIRTUAL, PRIMARY KEY (_id) NOT ENFORCED) WITH (" + ignoreIfNull("hosts", CONTAINER.getHostAndPort()) + ignoreIfNull("username", "flinkuser") + ignoreIfNull("password", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;") + ignoreIfNull("database", str) + ignoreIfNull("collection", str2) + " 'scan.incremental.snapshot.enabled' = '" + this.parallelismSnapshot + "', 'connector' = 'mongodb-cdc')");
        this.tEnv.executeSql("CREATE TABLE mongodb_sink ( db_name STRING, coll_name STRING, seq STRING, PRIMARY KEY (db_name, coll_name, seq) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO mongodb_sink SELECT db_name, coll_name, seq FROM mongodb_source");
        MongoDBTestUtils.waitForSnapshotStarted("mongodb_sink");
        return executeSql;
    }

    private String ignoreIfNull(String str, String str2) {
        return str2 != null ? String.format(" '%s' = '%s',", str, str2) : "";
    }

    private void insertRecordsInDatabase(String str) {
        MongoDatabase database = mongodbClient.getDatabase(str);
        database.getCollection("coll_a1").insertOne(new Document("seq", "A102"));
        database.getCollection("coll_a2").insertOne(new Document("seq", "A202"));
        database.getCollection("coll_b1").insertOne(new Document("seq", "B102"));
        database.getCollection("coll_b2").insertOne(new Document("seq", "B202"));
    }
}
