package org.apache.flink.connector.mongodb.table;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.flink.connector.mongodb.testutils.MongoShardedContainers;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.assertj.core.api.Assertions;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.testcontainers.containers.Network;

/* loaded from: input_file:org/apache/flink/connector/mongodb/table/MongoPartitionedTableSinkITCase.class */
class MongoPartitionedTableSinkITCase {

    @RegisterExtension
    private static final MongoShardedContainers MONGO_SHARDED_CONTAINER = MongoTestUtil.createMongoDBShardedContainers(Network.newNetwork());

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).build());
    private MongoClient mongoClient;

    MongoPartitionedTableSinkITCase() {
    }

    @BeforeEach
    void setUp() {
        this.mongoClient = MongoClients.create(MONGO_SHARDED_CONTAINER.getConnectionString());
    }

    @AfterEach
    void tearDown() {
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }

    @Test
    void testSinkIntoPartitionedTable() throws Exception {
        BsonDocument parse = BsonDocument.parse("{ b: 1, c: 1 }");
        MongoTestUtil.createIndex(this.mongoClient, "test", "sink_into_sharded_collection", parse, new IndexOptions().unique(true));
        MongoTestUtil.shardCollection(this.mongoClient, "test", "sink_into_sharded_collection", parse);
        List asList = Arrays.asList(Expressions.row(1L, new Object[]{Expressions.nullOf(DataTypes.BOOLEAN()), "ABCDEF", Double.valueOf(12.12d), 4}), Expressions.row(1L, new Object[]{Expressions.nullOf(DataTypes.BOOLEAN()), "ABCDEF", Double.valueOf(12.123d), 5}));
        List singletonList = Collections.singletonList("a");
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        createPartitionedTable(create, "test", "sink_into_sharded_collection", singletonList, Arrays.asList("b", "c"));
        create.fromValues(asList).executeInsert("mongo_sink").await();
        MongoCollection collection = this.mongoClient.getDatabase("test").getCollection("sink_into_sharded_collection");
        Document document = new Document();
        document.put("_id", 1L);
        document.put("a", 1L);
        document.put("b", (Object) null);
        document.put("c", "ABCDEF");
        document.put("d", Double.valueOf(12.123d));
        document.put("e", 5);
        Assertions.assertThat(collection.find(Filters.eq("_id", 1L))).containsExactly(new Document[]{document});
    }

    @Test
    void testSinkIntoPartitionedTableWithMutableShardKey() {
        BsonDocument parse = BsonDocument.parse("{ b: 1, c: 1 }");
        MongoTestUtil.createIndex(this.mongoClient, "test", "sink_into_mutable_sharded_collection", parse, new IndexOptions().unique(true));
        MongoTestUtil.shardCollection(this.mongoClient, "test", "sink_into_mutable_sharded_collection", parse);
        List asList = Arrays.asList(Expressions.row(1L, new Object[]{false, "ABCDEF", Double.valueOf(12.12d), 4}), Expressions.row(1L, new Object[]{true, "ABCDEF", Double.valueOf(12.123d), 5}));
        List singletonList = Collections.singletonList("a");
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        createPartitionedTable(create, "test", "sink_into_mutable_sharded_collection", singletonList, Arrays.asList("b", "c"));
        Assertions.assertThatThrownBy(() -> {
            create.fromValues(asList).executeInsert("mongo_sink").await();
        }).hasStackTraceContaining("Writing records to MongoDB failed");
    }

    @Test
    void testSinkIntoHashedPartitionedTable() throws Exception {
        BsonDocument parse = BsonDocument.parse("{ c: 'hashed' }");
        MongoTestUtil.createIndex(this.mongoClient, "test", "sink_into_hashed_sharded_collection", parse, new IndexOptions());
        MongoTestUtil.shardCollection(this.mongoClient, "test", "sink_into_hashed_sharded_collection", parse);
        List asList = Arrays.asList(Expressions.row(2L, new Object[]{true, "ABCDEF", Double.valueOf(12.12d), 4}), Expressions.row(2L, new Object[]{false, "ABCDEF", Double.valueOf(12.123d), 5}));
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        createPartitionedTable(create, "test", "sink_into_hashed_sharded_collection", Collections.singletonList("a"), Collections.singletonList("c"));
        create.fromValues(asList).executeInsert("mongo_sink").await();
        MongoCollection collection = this.mongoClient.getDatabase("test").getCollection("sink_into_hashed_sharded_collection");
        Document document = new Document();
        document.put("_id", 2L);
        document.put("a", 2L);
        document.put("b", false);
        document.put("c", "ABCDEF");
        document.put("d", Double.valueOf(12.123d));
        document.put("e", 5);
        Assertions.assertThat(collection.find(Filters.eq("_id", 2L))).containsExactly(new Document[]{document});
    }

    @Test
    void testSinkIntoPartitionedTableAll() throws Exception {
        BsonDocument parse = BsonDocument.parse("{ b: 1, c: 1 }");
        MongoTestUtil.createIndex(this.mongoClient, "test", "sink_into_sharded_collection_all", parse, new IndexOptions().unique(true));
        MongoTestUtil.shardCollection(this.mongoClient, "test", "sink_into_sharded_collection_all", parse);
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        createPartitionedTable(create, "test", "sink_into_sharded_collection_all", Collections.singletonList("a"), Arrays.asList("b", "c"));
        create.executeSql("INSERT INTO mongo_sink PARTITION (b='true', c='ABCDEF') SELECT 3, 12.1234, 5").await();
        create.executeSql("INSERT INTO mongo_sink PARTITION (b='true', c='ABCDEF') SELECT 3, 12.12345, 6").await();
        MongoCollection collection = this.mongoClient.getDatabase("test").getCollection("sink_into_sharded_collection_all");
        Document document = new Document();
        document.put("_id", 3L);
        document.put("a", 3L);
        document.put("b", true);
        document.put("c", "ABCDEF");
        document.put("d", Double.valueOf(12.12345d));
        document.put("e", 6);
        Assertions.assertThat(collection.find(Filters.eq("_id", 3L))).containsExactly(new Document[]{document});
    }

    @Test
    void testSinkIntoPartitionedTablePart() throws Exception {
        BsonDocument parse = BsonDocument.parse("{ c: 1, b: 1 }");
        MongoTestUtil.createIndex(this.mongoClient, "test", "sink_into_sharded_collection_part", parse, new IndexOptions().unique(true));
        MongoTestUtil.shardCollection(this.mongoClient, "test", "sink_into_sharded_collection_part", parse);
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        createPartitionedTable(create, "test", "sink_into_sharded_collection_part", Collections.singletonList("a"), Arrays.asList("c", "b"));
        create.executeSql("INSERT INTO mongo_sink PARTITION (c='ABCDEFG') SELECT 4, false, 12.12345, 6").await();
        create.executeSql("INSERT INTO mongo_sink PARTITION (c='ABCDEFG') SELECT 4, false, 12.123456, 7").await();
        MongoCollection collection = this.mongoClient.getDatabase("test").getCollection("sink_into_sharded_collection_part");
        Document document = new Document();
        document.put("_id", 4L);
        document.put("a", 4L);
        document.put("b", false);
        document.put("c", "ABCDEFG");
        document.put("d", Double.valueOf(12.123456d));
        document.put("e", 7);
        Assertions.assertThat(collection.find(Filters.eq("_id", 4L))).containsExactly(new Document[]{document});
    }

    private static void createPartitionedTable(TableEnvironment tableEnvironment, String str, String str2, List<String> list, Collection<String> collection) {
        tableEnvironment.executeSql(String.format("CREATE TABLE mongo_sink (a BIGINT NOT NULL,\nb BOOLEAN,\nc STRING NOT NULL,\nd DOUBLE,\ne INT NOT NULL,\nPRIMARY KEY (%s) NOT ENFORCED\n) PARTITIONED BY (%s)\nWITH (%s)", formatKeys(list), formatKeys(collection), MongoTestUtil.getConnectorSql(str, str2, MONGO_SHARDED_CONTAINER.getConnectionString())));
    }

    private static String formatKeys(Collection<String> collection) {
        return String.join(",", collection);
    }
}
