package org.apache.flink.cdc.connectors.mongodb.table;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.Updates;
import java.time.ZoneId;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.cdc.connectors.mongodb.source.MongoDBSourceTestBase;
import org.apache.flink.cdc.connectors.mongodb.utils.MongoDBTestUtils;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.table.utils.LegacyRowResource;
import org.bson.BsonDateTime;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.bson.types.ObjectId;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/cdc/connectors/mongodb/table/MongoDBConnectorITCase.class */
public class MongoDBConnectorITCase extends MongoDBSourceTestBase {
    private final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    private final StreamTableEnvironment tEnv = StreamTableEnvironment.create(this.env, EnvironmentSettings.newInstance().inStreamingMode().build());

    @ClassRule
    public static LegacyRowResource usesLegacyRows = LegacyRowResource.INSTANCE;
    private final boolean parallelismSnapshot;

    public MongoDBConnectorITCase(boolean z) {
        this.parallelismSnapshot = z;
    }

    @Parameterized.Parameters(name = "parallelismSnapshot: {0}")
    public static Object[] parameters() {
        return new Object[]{new Object[]{false}, new Object[]{true}};
    }

    @Before
    public void before() {
        TestValuesTableFactory.clearAllData();
        this.tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
        if (!this.parallelismSnapshot) {
            this.env.setParallelism(1);
        } else {
            this.env.setParallelism(4);
            this.env.enableCheckpointing(200L);
        }
    }

    @Test
    public void testConsumingAllEvents() throws ExecutionException, InterruptedException {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        this.tEnv.executeSql(String.format("CREATE TABLE mongodb_source ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'heartbeat.interval.ms' = '1000')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", executeCommandFileInSeparateDatabase, "products", Boolean.valueOf(this.parallelismSnapshot)));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name");
        MongoDBTestUtils.waitForSnapshotStarted("sink");
        MongoCollection collection = mongodbClient.getDatabase(executeCommandFileInSeparateDatabase).getCollection("products");
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000106")), Updates.set("description", "18oz carpenter hammer"));
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000107")), Updates.set("weight", Double.valueOf(5.1d)));
        collection.insertOne(productDocOf("100000000000000000000110", "jacket", "water resistent white wind breaker", Double.valueOf(0.2d)));
        collection.insertOne(productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", Double.valueOf(5.18d)));
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000110")), Updates.combine(new Bson[]{Updates.set("description", "new water resistent white wind breaker"), Updates.set("weight", Double.valueOf(0.5d))}));
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000111")), Updates.set("weight", Double.valueOf(5.17d)));
        MongoDBTestUtils.waitForSinkSize("sink", 19);
        collection.deleteOne(Filters.eq("_id", new ObjectId("100000000000000000000111")));
        MongoDBTestUtils.waitForSinkSize("sink", 20);
        Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"scooter,3.140", "car battery,8.100", "12-pack drill bits,0.800", "hammer,2.625", "rocks,5.100", "jacket,0.600", "spare tire,22.200"}));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    @Test
    public void testStartupFromTimestamp() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        Thread.sleep(5000L);
        this.tEnv.executeSql(String.format("CREATE TABLE mongodb_source ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'scan.incremental.snapshot.enabled' = '%s', 'scan.startup.mode' = 'timestamp', 'scan.startup.timestamp-millis' = '" + System.currentTimeMillis() + "', 'heartbeat.interval.ms' = '1000')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", executeCommandFileInSeparateDatabase, "products", Boolean.valueOf(this.parallelismSnapshot)));
        this.tEnv.executeSql("CREATE TABLE sink ( name STRING, weightSum DECIMAL(10,3), PRIMARY KEY (name) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT name, SUM(weight) FROM mongodb_source GROUP BY name");
        MongoCollection collection = mongodbClient.getDatabase(executeCommandFileInSeparateDatabase).getCollection("products");
        collection.insertOne(productDocOf("100000000000000000000110", "jacket", "water resistent white wind breaker", Double.valueOf(0.2d)));
        collection.insertOne(productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", Double.valueOf(5.18d)));
        MongoDBTestUtils.waitForSinkSize("sink", 2);
        Assert.assertThat(TestValuesTableFactory.getResults("sink"), Matchers.containsInAnyOrder(new String[]{"jacket,0.200", "scooter,5.180"}));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    @Test
    public void testAllTypes() throws Throwable {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("column_type_test");
        this.tEnv.executeSql(String.format("CREATE TABLE full_types (\n    _id STRING,\n    stringField STRING,\n    uuidField STRING,\n    md5Field STRING,\n    timeField TIME,\n    dateField DATE,\n    dateBefore1970 DATE,\n    dateToTimestampField TIMESTAMP(3),\n    dateToLocalTimestampField TIMESTAMP_LTZ(3),\n    timestampField TIMESTAMP(0),\n    timestampToLocalTimestampField TIMESTAMP_LTZ(0),\n    booleanField BOOLEAN,\n    decimal128Field DECIMAL ,\n    doubleField DOUBLE,\n    int32field INT,\n    int64Field BIGINT,\n    documentField ROW<a STRING,b BIGINT>,\n    mapField MAP<STRING,MAP<STRING,INT>>,\n    arrayField ARRAY<STRING>,\n    doubleArrayField ARRAY<DOUBLE>,\n    documentArrayField ARRAY<ROW<a STRING,b BIGINT>>,\n    minKeyField STRING,\n    maxKeyField STRING,\n    regexField STRING,\n    undefinedField STRING,\n    nullField STRING,\n    binaryField BINARY,\n    javascriptField STRING,\n    dbReferenceField ROW<$ref STRING,$id STRING>,\n    PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", executeCommandFileInSeparateDatabase, "full_types"));
        this.tEnv.executeSql("CREATE TABLE sink (\n    _id STRING,\n    stringField STRING,\n    uuidField STRING,\n    md5Field STRING,\n    timeField TIME,\n    dateField DATE,\n    dateBefore1970 DATE,\n    dateToTimestampField TIMESTAMP(3),\n    dateToLocalTimestampField TIMESTAMP_LTZ(3),\n    timestampField TIMESTAMP(0),\n    timestampToLocalTimestampField TIMESTAMP_LTZ(0),\n    booleanField BOOLEAN,\n    decimal128Field DECIMAL ,\n    doubleField DOUBLE,\n    int32field INT,\n    int64Field BIGINT,\n    documentField ROW<a STRING,b BIGINT>,\n    mapField MAP<STRING,MAP<STRING,INT>>,\n    arrayField ARRAY<STRING>,\n    doubleArrayField ARRAY<DOUBLE>,\n    documentArrayField ARRAY<ROW<a STRING,b BIGINT>>,\n    minKeyField STRING,\n    maxKeyField STRING,\n    regexField STRING,\n    undefinedField STRING,\n    nullField STRING,\n    binaryField BINARY,\n    javascriptField STRING,\n    dbReferenceField ROW<$ref STRING,$id STRING>\n) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO sink SELECT _id,\nstringField,\nuuidField,\nmd5Field,\ntimeField,\ndateField,\ndateBefore1970,\ndateToTimestampField,\ndateToLocalTimestampField,\ntimestampField,\ntimestampToLocalTimestampField,\nbooleanField,\ndecimal128Field,\ndoubleField,\nint32field,\nint64Field,\ndocumentField,\nmapField,\narrayField,\ndoubleArrayField,\ndocumentArrayField,\nminKeyField,\nmaxKeyField,\nregexField,\nundefinedField,\nnullField,\nbinaryField,\njavascriptField,\ndbReferenceField\nFROM full_types");
        MongoDBTestUtils.waitForSnapshotStarted("sink");
        MongoCollection collection = mongodbClient.getDatabase(executeCommandFileInSeparateDatabase).getCollection("full_types");
        collection.updateOne(Filters.eq("_id", new ObjectId("5d505646cf6d4fe581014ab2")), Updates.set("int64Field", 510L));
        MongoDBTestUtils.waitForSinkSize("sink", 3);
        BsonDateTime bsonDateTime = new BsonDateTime(1630694164123L);
        BsonTimestamp bsonTimestamp = new BsonTimestamp(1630694164, 0);
        collection.updateOne(Filters.eq("_id", new ObjectId("5d505646cf6d4fe581014ab2")), Updates.combine(new Bson[]{Updates.set("timeField", bsonDateTime), Updates.set("dateField", bsonDateTime), Updates.set("dateToTimestampField", bsonDateTime), Updates.set("dateToLocalTimestampField", bsonDateTime), Updates.set("timestampField", bsonTimestamp), Updates.set("timestampToLocalTimestampField", bsonTimestamp)}));
        MongoDBTestUtils.waitForSinkSize("sink", 5);
        Assert.assertEquals(Arrays.asList("+I(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,50,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "-U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,50,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "-U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,17:54:14,2019-08-11,1960-08-11,2019-08-11T17:54:14.692,2019-08-11T17:54:14.692Z,2019-08-11T17:47:44,2019-08-11T17:47:44Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)", "+U(5d505646cf6d4fe581014ab2,hello,0bd1e27e-2829-4b47-8e21-dfef93da44e1,2078693f4c61ce3073b01be69ab76428,18:36:04,2021-09-03,1960-08-11,2021-09-03T18:36:04.123,2021-09-03T18:36:04.123Z,2021-09-03T18:36:04,2021-09-03T18:36:04Z,true,11,10.5,10,510,hello,50,{inner_map={key=234}},[hello, world],[1.0, 1.1, null],[hello0,51, hello1,53],MIN_KEY,MAX_KEY,/^H/i,null,null,[1, 2, 3],function() { x++; },ref_doc,5d505646cf6d4fe581014ab3)"), TestValuesTableFactory.getRawResults("sink"));
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    @Test
    public void testMetadataColumns() throws Exception {
        String executeCommandFileInSeparateDatabase = CONTAINER.executeCommandFileInSeparateDatabase("inventory");
        this.tEnv.executeSql(String.format("CREATE TABLE mongodb_source ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), db_name STRING METADATA FROM 'database_name' VIRTUAL, collection_name STRING METADATA VIRTUAL, PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'mongodb-cdc', 'connection.options' = 'connectTimeoutMS=12000&socketTimeoutMS=13000', 'hosts' = '%s', 'username' = '%s', 'password' = '%s', 'database' = '%s', 'collection' = '%s', 'scan.incremental.snapshot.enabled' = '%s')", CONTAINER.getHostAndPort(), "flinkuser", "a1?~!@#$%^&*(){}[]<>.,+_-=/|:;", executeCommandFileInSeparateDatabase, "products", Boolean.valueOf(this.parallelismSnapshot)));
        this.tEnv.executeSql("CREATE TABLE meta_sink ( _id STRING NOT NULL, name STRING, description STRING, weight DECIMAL(10,3), database_name STRING, collection_name STRING, PRIMARY KEY (_id) NOT ENFORCED) WITH ( 'connector' = 'values', 'sink-insert-only' = 'false')");
        TableResult executeSql = this.tEnv.executeSql("INSERT INTO meta_sink SELECT * FROM mongodb_source");
        MongoDBTestUtils.waitForSinkSize("meta_sink", 9);
        MongoCollection collection = mongodbClient.getDatabase(executeCommandFileInSeparateDatabase).getCollection("products");
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000106")), Updates.set("description", "18oz carpenter hammer"));
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000107")), Updates.set("weight", Double.valueOf(5.1d)));
        collection.insertOne(productDocOf("100000000000000000000110", "jacket", "water resistent white wind breaker", Double.valueOf(0.2d)));
        collection.insertOne(productDocOf("100000000000000000000111", "scooter", "Big 2-wheel scooter", Double.valueOf(5.18d)));
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000110")), Updates.combine(new Bson[]{Updates.set("description", "new water resistent white wind breaker"), Updates.set("weight", Double.valueOf(0.5d))}));
        collection.updateOne(Filters.eq("_id", new ObjectId("100000000000000000000111")), Updates.set("weight", Double.valueOf(5.17d)));
        MongoDBTestUtils.waitForSinkSize("meta_sink", 15);
        collection.deleteOne(Filters.eq("_id", new ObjectId("100000000000000000000111")));
        MongoDBTestUtils.waitForSinkSize("meta_sink", 16);
        List list = (List) Stream.of((Object[]) new String[]{"+I(100000000000000000000101,scooter,Small 2-wheel scooter,3.140,%s,products)", "+I(100000000000000000000102,car battery,12V car battery,8.100,%s,products)", "+I(100000000000000000000103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,%s,products)", "+I(100000000000000000000104,hammer,12oz carpenter''s hammer,0.750,%s,products)", "+I(100000000000000000000105,hammer,12oz carpenter''s hammer,0.875,%s,products)", "+I(100000000000000000000106,hammer,12oz carpenter''s hammer,1.000,%s,products)", "+I(100000000000000000000107,rocks,box of assorted rocks,5.300,%s,products)", "+I(100000000000000000000108,jacket,water resistent black wind breaker,0.100,%s,products)", "+I(100000000000000000000109,spare tire,24 inch spare tire,22.200,%s,products)", "+I(100000000000000000000110,jacket,water resistent white wind breaker,0.200,%s,products)", "+I(100000000000000000000111,scooter,Big 2-wheel scooter,5.180,%s,products)", "+U(100000000000000000000106,hammer,18oz carpenter hammer,1.000,%s,products)", "+U(100000000000000000000107,rocks,box of assorted rocks,5.100,%s,products)", "+U(100000000000000000000110,jacket,new water resistent white wind breaker,0.500,%s,products)", "+U(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)", "-D(100000000000000000000111,scooter,Big 2-wheel scooter,5.170,%s,products)"}).map(str -> {
            return String.format(str, executeCommandFileInSeparateDatabase);
        }).sorted().collect(Collectors.toList());
        List rawResults = TestValuesTableFactory.getRawResults("meta_sink");
        Collections.sort(rawResults);
        Assert.assertEquals(list, rawResults);
        ((JobClient) executeSql.getJobClient().get()).cancel().get();
    }

    private Document productDocOf(String str, String str2, String str3, Double d) {
        Document document = new Document();
        if (str != null) {
            document.put("_id", new ObjectId(str));
        }
        document.put("name", str2);
        document.put("description", str3);
        document.put("weight", d);
        return document;
    }
}
