package org.apache.paimon.flink.action.cdc.mongodb;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.flink.core.execution.JobClient;
import org.apache.paimon.catalog.FileSystemCatalogOptions;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/paimon/flink/action/cdc/mongodb/MongoDBSyncDatabaseActionITCase.class */
public class MongoDBSyncDatabaseActionITCase extends MongoDBActionITCaseBase {
    @Timeout(60)
    @Test
    public void testSchemaEvolution() throws Exception {
        writeRecordsToMongoDB("test-data-1", this.database, "database");
        writeRecordsToMongoDB("test-data-2", this.database, "database");
        Map<String, String> basicMongoDBConfig = getBasicMongoDBConfig();
        basicMongoDBConfig.put("database", this.database);
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicMongoDBConfig).withTableConfig(getBasicTableConfig()).build());
        testSchemaEvolutionImpl("t1", "t2", this.database);
    }

    private void testSchemaEvolutionImpl(String str, String str2, String str3) throws Exception {
        waitingTables(str, str2);
        FileStoreTable fileStoreTable = getFileStoreTable(str);
        FileStoreTable fileStoreTable2 = getFileStoreTable(str2);
        RowType of = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "name", "description", "weight"});
        List<String> singletonList = Collections.singletonList("_id");
        waitForResult(Arrays.asList("+I[100000000000000000000101, scooter, Small 2-wheel scooter, 3.14]", "+I[100000000000000000000102, car battery, 12V car battery, 8.1]", "+I[100000000000000000000103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8]"), fileStoreTable, of, singletonList);
        RowType of2 = RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "name", "address", "phone_number"});
        List<String> singletonList2 = Collections.singletonList("_id");
        waitForResult(Arrays.asList("+I[100000000000000000000101, user_1, Shanghai, 123563291234]", "+I[100000000000000000000102, user_2, Beijing, 1234347891234]", "+I[100000000000000000000103, user_3, Hangzhou, 1235567891234]"), fileStoreTable2, of2, singletonList2);
        writeRecordsToMongoDB("test-data-3", str3, "database");
        waitForResult(Arrays.asList("+I[100000000000000000000101, scooter, Small 2-wheel scooter, 350]", "+I[100000000000000000000102, car battery, High-performance car battery, 8.1]", "+I[100000000000000000000103, 12-pack drill bits, Set of 12 professional-grade drill bits, 0.8]"), fileStoreTable, of, singletonList);
        writeRecordsToMongoDB("test-data-4", str3, "database");
        waitForResult(Arrays.asList("+I[100000000000000000000101, user_1, Guangzhou, 123563291234]", "+I[100000000000000000000102, user_2, Beijing, 1234546591234]", "+I[100000000000000000000103, user_3, Nanjing, 1235567891234]"), fileStoreTable2, of2, singletonList2);
    }

    @Test
    public void testCatalogAndTableConfig() {
        MongoDBSyncDatabaseAction build = syncDatabaseActionBuilder(getBasicMongoDBConfig()).withCatalogConfig(Collections.singletonMap("catalog-key", "catalog-value")).withTableConfig(Collections.singletonMap("table-key", "table-value")).build();
        Assertions.assertThat(build.catalogConfig()).containsEntry("catalog-key", "catalog-value");
        Assertions.assertThat(build.tableConfig()).containsExactlyEntriesOf(Collections.singletonMap("table-key", "table-value"));
    }

    @Timeout(60)
    @Test
    public void testMongoDBNestedDataSynchronizationAndVerification() throws Exception {
        writeRecordsToMongoDB("test-data-5", this.database, "database");
        writeRecordsToMongoDB("test-data-6", this.database, "database");
        Map<String, String> basicMongoDBConfig = getBasicMongoDBConfig();
        basicMongoDBConfig.put("database", this.database);
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicMongoDBConfig).withTableConfig(getBasicTableConfig()).build());
        waitingTables("t3", "t4");
        FileStoreTable fileStoreTable = getFileStoreTable("t3");
        FileStoreTable fileStoreTable2 = getFileStoreTable("t4");
        waitForResult(Arrays.asList("+I[610000000000000000000101, Switzerland, Italian, {\"f\":\"v\",\"n\":null}]", "+I[610000000000000000000102, Switzerland, Italian, ]", "+I[610000000000000000000103, Switzerland, [\"Italian\"], ]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "country", "languages", "religions"}), Collections.singletonList("_id"));
        waitForResult(Arrays.asList("+I[610000000000000000000101, youtube#videoListResponse, \\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", {\"totalResults\":1,\"resultsPerPage\":1}, [{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\"/m/0cnfvd\",\"/m/01jdpf\"]}}]]", "+I[610000000000000000000102, youtube#videoListResponse, \\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", page, [{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\"/m/0cnfvd\",\"/m/01jdpf\"]}}]]", "+I[610000000000000000000103, youtube#videoListResponse, \\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", {\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1}, [{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\"/m/0cnfvd\",\"/m/01jdpf\"]}}]]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "kind", "etag", "pageInfo", "items"}), Collections.singletonList("_id"));
    }

    @Timeout(60)
    @Test
    public void testDynamicTableCreationInMongoDB() throws Exception {
        String str = this.database + UUID.randomUUID();
        writeRecordsToMongoDB("test-data-5", str, "database");
        Map<String, String> basicMongoDBConfig = getBasicMongoDBConfig();
        basicMongoDBConfig.put("database", str);
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicMongoDBConfig).withTableConfig(getBasicTableConfig()).withCatalogConfig(Collections.singletonMap(FileSystemCatalogOptions.CASE_SENSITIVE.key(), "false")).build());
        waitingTables("t3");
        FileStoreTable fileStoreTable = getFileStoreTable("t3");
        writeRecordsToMongoDB("test-data-6", str, "database");
        waitingTables("t4");
        FileStoreTable fileStoreTable2 = getFileStoreTable("t4");
        waitForResult(Arrays.asList("+I[610000000000000000000101, Switzerland, Italian, {\"f\":\"v\",\"n\":null}]", "+I[610000000000000000000102, Switzerland, Italian, ]", "+I[610000000000000000000103, Switzerland, [\"Italian\"], ]"), fileStoreTable, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "country", "languages", "religions"}), Collections.singletonList("_id"));
        waitForResult(Arrays.asList("+I[610000000000000000000101, youtube#videoListResponse, \\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", {\"totalResults\":1,\"resultsPerPage\":1}, [{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\"/m/0cnfvd\",\"/m/01jdpf\"]}}]]", "+I[610000000000000000000102, youtube#videoListResponse, \\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", page, [{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\"/m/0cnfvd\",\"/m/01jdpf\"]}}]]", "+I[610000000000000000000103, youtube#videoListResponse, \\\"79S54kzisD_9SOTfQLu_0TVQSpY/mYlS4-ghMGhc1wTFCwoQl3IYDZc\\\", {\"pagehit\":{\"kind\":\"youtube#video\"},\"totalResults\":1,\"resultsPerPage\":1}, [{\"kind\":\"youtube#video\",\"etag\":\"\\\\\\\"79S54kzisD_9SOTfQLu_0TVQSpY/A4foLs-VO317Po_ulY6b5mSimZA\\\\\\\"\",\"id\":\"wHkPb68dxEw\",\"statistics\":{\"viewCount\":\"9211\",\"likeCount\":\"79\",\"dislikeCount\":\"11\",\"favoriteCount\":\"0\",\"commentCount\":\"29\"},\"topicDetails\":{\"topicIds\":[\"/m/02mjmr\"],\"relevantTopicIds\":[\"/m/0cnfvd\",\"/m/01jdpf\"]}}]]"), fileStoreTable2, RowType.of(new DataType[]{DataTypes.STRING().notNull(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "kind", "etag", "pageinfo", "items"}), Collections.singletonList("_id"));
    }

    @Timeout(60)
    @Test
    public void testTableAffix() throws Exception {
        createFileStoreTable("test_prefix_t1_test_suffix", RowType.of(new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}, new String[]{"_id", "name", "description", "weight"}), Collections.emptyList(), Collections.singletonList("_id"), Collections.emptyMap());
        String str = this.database + UUID.randomUUID();
        writeRecordsToMongoDB("test-data-1", str, "database");
        writeRecordsToMongoDB("test-data-2", str, "database");
        Map<String, String> basicMongoDBConfig = getBasicMongoDBConfig();
        basicMongoDBConfig.put("database", str);
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicMongoDBConfig).withTableConfig(getBasicTableConfig()).withTablePrefix("test_prefix_").withTableSuffix("_test_suffix").includingTables(ThreadLocalRandom.current().nextBoolean() ? "t1|t2" : ".*").build());
        testSchemaEvolutionImpl("test_prefix_t1_test_suffix", "test_prefix_t2_test_suffix", str);
    }

    @Timeout(60)
    @Test
    public void testNewlyAddedTablesOptionsChange() throws Exception {
        String str = this.database + UUID.randomUUID();
        writeRecordsToMongoDB("test-data-5", str, "database");
        Map<String, String> basicMongoDBConfig = getBasicMongoDBConfig();
        basicMongoDBConfig.put("database", str);
        HashMap hashMap = new HashMap();
        hashMap.put("bucket", "1");
        hashMap.put("sink.parallelism", "1");
        JobClient runActionWithDefaultEnv = runActionWithDefaultEnv(syncDatabaseActionBuilder(basicMongoDBConfig).withTableConfig(hashMap).build());
        waitingTables("t3");
        runActionWithDefaultEnv.cancel();
        hashMap.put("sink.savepoint.auto-tag", "true");
        hashMap.put("tag.num-retained-max", "5");
        hashMap.put("tag.automatic-creation", "process-time");
        hashMap.put("tag.creation-period", "hourly");
        hashMap.put("tag.creation-delay", "600000");
        hashMap.put("snapshot.time-retained", "1h");
        hashMap.put("snapshot.num-retained.min", "5");
        hashMap.put("snapshot.num-retained.max", "10");
        hashMap.put("changelog-producer", "input");
        writeRecordsToMongoDB("test-data-6", str, "database");
        runActionWithDefaultEnv(syncDatabaseActionBuilder(basicMongoDBConfig).withTableConfig(hashMap).build());
        waitingTables("t4");
        Assertions.assertThat(getFileStoreTable("t4").options()).containsAllEntriesOf(hashMap);
    }
}
