package org.apache.flink.connector.mongodb.table;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.regex.Pattern;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.expressions.Expression;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.bson.BsonArray;
import org.bson.BsonBinary;
import org.bson.BsonBoolean;
import org.bson.BsonDateTime;
import org.bson.BsonDbPointer;
import org.bson.BsonDecimal128;
import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
import org.bson.BsonJavaScript;
import org.bson.BsonJavaScriptWithScope;
import org.bson.BsonRegularExpression;
import org.bson.BsonString;
import org.bson.BsonSymbol;
import org.bson.BsonTimestamp;
import org.bson.Document;
import org.bson.types.Binary;
import org.bson.types.Decimal128;
import org.bson.types.ObjectId;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/apache/flink/connector/mongodb/table/MongoDynamicTableSinkITCase.class */
class MongoDynamicTableSinkITCase {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDynamicTableSinkITCase.class);

    @Container
    private static final MongoDBContainer MONGO_CONTAINER = MongoTestUtil.createMongoDBContainer().withLogConsumer(new Slf4jLogConsumer(LOG));

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).build());
    private MongoClient mongoClient;

    MongoDynamicTableSinkITCase() {
    }

    @BeforeEach
    void setUp() {
        this.mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
    }

    @AfterEach
    void tearDown() {
        if (this.mongoClient != null) {
            this.mongoClient.close();
        }
    }

    @Test
    void testSinkWithAllSupportedTypes() throws ExecutionException, InterruptedException {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql(String.join("\n", Arrays.asList("CREATE TABLE mongo_sink", "(", "  _id BIGINT,", "  f1 STRING,", "  f2 BOOLEAN,", "  f3 BINARY,", "  f4 INTEGER,", "  f5 TIMESTAMP_LTZ(6),", "  f6 TIMESTAMP(3),", "  f7 DOUBLE,", "  f8 DECIMAL(10, 2),", "  f9 MAP<STRING, INTEGER>,", "  f10 ROW<k INTEGER>,", "  f11 ARRAY<STRING>,", "  f12 ARRAY<ROW<k STRING>>,", "  PRIMARY KEY (_id) NOT ENFORCED", ") WITH (", getConnectorSql("test", "sink_with_all_supported_types"), ")")));
        Instant now = Instant.now();
        create.fromValues(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("_id", DataTypes.BIGINT()), DataTypes.FIELD("f1", DataTypes.STRING()), DataTypes.FIELD("f2", DataTypes.BOOLEAN()), DataTypes.FIELD("f3", DataTypes.BINARY(1)), DataTypes.FIELD("f4", DataTypes.INT()), DataTypes.FIELD("f5", DataTypes.TIMESTAMP_LTZ(6)), DataTypes.FIELD("f6", DataTypes.TIMESTAMP(3)), DataTypes.FIELD("f7", DataTypes.DOUBLE()), DataTypes.FIELD("f8", DataTypes.DECIMAL(10, 2)), DataTypes.FIELD("f9", DataTypes.MAP(DataTypes.STRING(), DataTypes.INT())), DataTypes.FIELD("f10", DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("k", DataTypes.INT())})), DataTypes.FIELD("f11", DataTypes.ARRAY(DataTypes.STRING())), DataTypes.FIELD("f12", DataTypes.ARRAY(DataTypes.ROW(new DataTypes.Field[]{DataTypes.FIELD("K", DataTypes.STRING())})))}), new Object[]{Row.of(new Object[]{1L, "ABCDE", true, new byte[]{3}, 6, now, Timestamp.from(now), Double.valueOf(10.1d), new BigDecimal("11.11"), Collections.singletonMap("k", 12), Row.of(new Object[]{13}), Arrays.asList("14_1", "14_2"), Arrays.asList(Row.of(new Object[]{"15_1"}), Row.of(new Object[]{"15_2"}))})}).executeInsert("mongo_sink").await();
        Assertions.assertThat((Document) this.mongoClient.getDatabase("test").getCollection("sink_with_all_supported_types").find(Filters.eq("_id", 1L)).first()).isEqualTo(new Document("_id", 1L).append("f1", "ABCDE").append("f2", true).append("f3", new Binary(new byte[]{3})).append("f4", 6).append("f5", Date.from(now)).append("f6", Date.from(now)).append("f7", Double.valueOf(10.1d)).append("f8", new Decimal128(new BigDecimal("11.11"))).append("f9", new Document("k", 12)).append("f10", new Document("k", 13)).append("f11", Arrays.asList("14_1", "14_2")).append("f12", Arrays.asList(new Document("k", "15_1"), new Document("k", "15_2"))));
    }

    @Test
    void testRoundTripReadAndSink() throws ExecutionException, InterruptedException {
        BsonDocument append = new BsonDocument("f1", new BsonString("ABCDE")).append("f2", new BsonBoolean(true)).append("f3", new BsonBinary(new byte[]{3})).append("f4", new BsonInt32(32)).append("f5", new BsonInt64(64L)).append("f6", new BsonDouble(128.128d)).append("f7", new BsonDecimal128(new Decimal128(new BigDecimal("256.256")))).append("f8", new BsonDateTime(Instant.now().toEpochMilli())).append("f9", new BsonTimestamp((int) Instant.now().getEpochSecond(), 100)).append("f10", new BsonRegularExpression(Pattern.compile("^9$").pattern(), "i")).append("f11", new BsonJavaScript("function() { return 10; }")).append("f12", new BsonJavaScriptWithScope("function() { return 11; }", new BsonDocument())).append("f13", new BsonDbPointer("test.test", new ObjectId())).append("f14", new BsonSymbol("symbol")).append("f15", new BsonArray(Arrays.asList(new BsonInt32(1), new BsonInt32(2)))).append("f16", new BsonDocument("k", new BsonInt32(32)));
        this.mongoClient.getDatabase("test").getCollection("test_round_trip_source").withDocumentClass(BsonDocument.class).insertOne(append);
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql(String.format("CREATE TABLE mongo_source (\n`_id` STRING,\n`f1` STRING,\n`f2` BOOLEAN,\n`f3` BINARY,\n`f4` INTEGER,\n`f5` BIGINT,\n`f6` DOUBLE,\n`f7` DECIMAL(10, 3),\n`f8` TIMESTAMP_LTZ(3),\n`f9` STRING,\n`f10` STRING,\n`f11` STRING,\n`f12` STRING,\n`f13` STRING,\n`f14` STRING,\n`f15` ARRAY<INTEGER>,\n`f16` ROW<k INTEGER>,\n PRIMARY KEY (_id) NOT ENFORCED\n) WITH ( %s )", getConnectorSql("test", "test_round_trip_source")));
        create.executeSql(String.format("CREATE TABLE mongo_sink WITH ( %s ) LIKE mongo_source", getConnectorSql("test", "test_round_trip_sink")));
        create.executeSql("insert into mongo_sink select * from mongo_source").await();
        Assertions.assertThat((BsonDocument) this.mongoClient.getDatabase("test").getCollection("test_round_trip_sink").withDocumentClass(BsonDocument.class).find().first()).isEqualTo(append);
    }

    @Test
    void testSinkWithAllRowKind() throws ExecutionException, InterruptedException {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment create = StreamTableEnvironment.create(executionEnvironment);
        create.createTemporaryView("value_source", create.fromChangelogStream(executionEnvironment.fromCollection(Arrays.asList(Row.ofKind(RowKind.INSERT, new Object[]{1L, "Alice"}), Row.ofKind(RowKind.DELETE, new Object[]{1L, "Alice"}), Row.ofKind(RowKind.INSERT, new Object[]{2L, "Bob"}), Row.ofKind(RowKind.UPDATE_BEFORE, new Object[]{2L, "Bob"}), Row.ofKind(RowKind.UPDATE_AFTER, new Object[]{2L, "Tom"}))).returns(new RowTypeInfo(new TypeInformation[]{Types.LONG, Types.STRING}, new String[]{"id", "name"})), Schema.newBuilder().column("id", DataTypes.BIGINT()).column("name", DataTypes.STRING()).build()));
        create.executeSql(String.format("CREATE TABLE mongo_sink (\n`_id` BIGINT,\n`name` STRING,\n PRIMARY KEY (_id) NOT ENFORCED\n) WITH ( %s )", getConnectorSql("test", "test_sink_with_all_row_kind")));
        create.executeSql("insert into mongo_sink select * from value_source").await();
        Assertions.assertThat((List) this.mongoClient.getDatabase("test").getCollection("test_sink_with_all_row_kind").find().into(new ArrayList())).isEqualTo(Collections.singletonList(new Document("_id", 2L).append("name", "Tom")));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testSinkWithReservedId(boolean z) throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql(String.format("CREATE TABLE mongo_sink (_id STRING NOT NULL,\nf1 STRING NOT NULL,\nPRIMARY KEY (_id) NOT ENFORCED\n)\nWITH (%s)", getConnectorSql("test", "sink_with_reserved_id")));
        Serializable objectId = new ObjectId();
        create.fromValues(new Expression[]{Expressions.row(objectId.toHexString(), new Object[]{"r1"}), Expressions.row("str", new Object[]{"r2"})}).executeInsert("mongo_sink", z).await();
        MongoCollection collection = this.mongoClient.getDatabase("test").getCollection("sink_with_reserved_id");
        ArrayList arrayList = new ArrayList();
        collection.find(Filters.in("_id", new Serializable[]{objectId, "str"})).into(arrayList);
        Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new Document[]{new Document("_id", objectId).append("f1", "r1"), new Document("_id", "str").append("f1", "r2")});
    }

    @Test
    void testOverwriteSinkWithoutPrimaryKey() {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql(String.format("CREATE TABLE mongo_sink (f1 STRING NOT NULL\n)\nWITH (%s)", getConnectorSql("test", "overwrite_sink_without_primary_key")));
        Assertions.assertThatThrownBy(() -> {
            create.fromValues(new Expression[]{Expressions.row("d1", new Object[0]), Expressions.row("d1", new Object[0])}).executeInsert("mongo_sink", true).await();
        }).isInstanceOf(IllegalStateException.class).hasMessageContaining("Overwrite sink requires specifying the table's primary key");
    }

    @Test
    void testSinkWithoutPrimaryKey() throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql(String.format("CREATE TABLE mongo_sink (f1 STRING NOT NULL\n)\nWITH (%s)", getConnectorSql("test", "sink_without_primary_key")));
        create.fromValues(new Expression[]{Expressions.row("d1", new Object[0]), Expressions.row("d1", new Object[0])}).executeInsert("mongo_sink").await();
        MongoCollection collection = this.mongoClient.getDatabase("test").getCollection("sink_without_primary_key");
        ArrayList arrayList = new ArrayList();
        collection.find().into(arrayList);
        Assertions.assertThat(arrayList).hasSize(2);
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assertions.assertThat(((Document) it.next()).get("f1")).isEqualTo("d1");
        }
    }

    @Test
    void testSinkWithNonCompositePrimaryKey() throws Exception {
        Instant now = Instant.now();
        testSinkWithoutReservedId("test", "sink_with_non_composite_pk", Collections.singletonList("a"), Collections.singletonList(Expressions.row(2L, new Object[]{true, "ABCDE", Double.valueOf(12.12d), 4, Timestamp.from(now), now})));
        Document document = (Document) this.mongoClient.getDatabase("test").getCollection("sink_with_non_composite_pk").find(Filters.eq("_id", 2L)).first();
        Document document2 = new Document();
        document2.put("_id", 2L);
        document2.put("a", 2L);
        document2.put("b", true);
        document2.put("c", "ABCDE");
        document2.put("d", Double.valueOf(12.12d));
        document2.put("e", 4);
        document2.put("f", Date.from(now));
        document2.put("g", Date.from(now));
        Assertions.assertThat(document).isEqualTo(document2);
    }

    @Test
    void testSinkWithCompositePrimaryKey() throws Exception {
        Instant now = Instant.now();
        testSinkWithoutReservedId("test", "sink_with_composite_pk", Arrays.asList("a", "c"), Collections.singletonList(Expressions.row(1L, new Object[]{true, "ABCDE", Double.valueOf(12.12d), 4, Timestamp.from(now), now})));
        MongoCollection collection = this.mongoClient.getDatabase("test").getCollection("sink_with_composite_pk");
        Document document = new Document();
        document.put("a", 1L);
        document.put("c", "ABCDE");
        Document document2 = (Document) collection.find(Filters.eq("_id", document)).first();
        Document document3 = new Document();
        document3.put("_id", new Document(document));
        document3.put("a", 1L);
        document3.put("b", true);
        document3.put("c", "ABCDE");
        document3.put("d", Double.valueOf(12.12d));
        document3.put("e", 4);
        document3.put("f", Date.from(now));
        document3.put("g", Date.from(now));
        Assertions.assertThat(document2).isEqualTo(document3);
    }

    private void testSinkWithoutReservedId(String str, String str2, List<String> list, List<Expression> list2) throws Exception {
        TableEnvironment create = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
        create.executeSql(String.format("CREATE TABLE mongo_sink (a BIGINT NOT NULL,\nb BOOLEAN,\nc STRING NOT NULL,\nd DOUBLE,\ne INT NOT NULL,\nf TIMESTAMP NOT NULL,\ng TIMESTAMP_LTZ NOT NULL,\nPRIMARY KEY (%s) NOT ENFORCED\n)\nWITH (%s)", getPrimaryKeys(list), getConnectorSql(str, str2)));
        create.fromValues(list2).executeInsert("mongo_sink").await();
    }

    private static String getPrimaryKeys(List<String> list) {
        return String.join(",", list);
    }

    private static String getConnectorSql(String str, String str2) {
        return String.format("'%s'='%s',\n", FactoryUtil.CONNECTOR.key(), MongoTestUtil.MONGODB_HOSTNAME) + String.format("'%s'='%s',\n", MongoConnectorOptions.URI.key(), MONGO_CONTAINER.getConnectionString()) + String.format("'%s'='%s',\n", MongoConnectorOptions.DATABASE.key(), str) + String.format("'%s'='%s'\n", MongoConnectorOptions.COLLECTION.key(), str2);
    }
}
