package org.apache.flink.connector.mongodb.source;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.IndexOptions;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.Updates;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.mongodb.common.utils.MongoConstants;
import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy;
import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema;
import org.apache.flink.connector.mongodb.table.serialization.MongoRowDataDeserializationSchema;
import org.apache.flink.connector.mongodb.testutils.MongoShardedContainers;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.apache.flink.util.CollectionUtil;
import org.assertj.core.api.Assertions;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.testcontainers.containers.Network;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/apache/flink/connector/mongodb/source/MongoSourceITCase.class */
class MongoSourceITCase {
    private static final int PARALLELISM = 2;

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(PARALLELISM).build());

    @RegisterExtension
    private static final MongoShardedContainers MONGO_SHARDED_CONTAINER = MongoTestUtil.createMongoDBShardedContainers(Network.newNetwork());

    @RegisterExtension
    private final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private static MongoClient mongoClient;
    private static final String TEST_DATABASE = "test_source";
    private static final String TEST_COLLECTION = "test_coll";
    private static final String TEST_SHARDED_COLLECTION = "test_sharded_coll";
    private static final String TEST_HASHED_KEY_SHARDED_COLLECTION = "test_hashed_key_sharded_coll";
    private static final int TEST_RECORD_SIZE = 30000;
    private static final int TEST_RECORD_BATCH_SIZE = 10000;

    /* loaded from: input_file:org/apache/flink/connector/mongodb/source/MongoSourceITCase$FailingMapper.class */
    private static class FailingMapper implements MapFunction<RowData, RowData>, CheckpointListener {
        private final SharedReference<AtomicBoolean> failed;
        private int emittedRecords;

        private FailingMapper(SharedReference<AtomicBoolean> sharedReference) {
            this.emittedRecords = 0;
            this.failed = sharedReference;
        }

        public RowData map(RowData rowData) {
            this.emittedRecords++;
            return rowData;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (((AtomicBoolean) this.failed.get()).get() || this.emittedRecords == 0) {
                return;
            }
            ((AtomicBoolean) this.failed.get()).set(true);
            throw new Exception("Expected failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/mongodb/source/MongoSourceITCase$MongoJsonDeserializationSchema.class */
    private static class MongoJsonDeserializationSchema implements MongoDeserializationSchema<String> {
        private MongoJsonDeserializationSchema() {
        }

        /* renamed from: deserialize, reason: merged with bridge method [inline-methods] */
        public String m5deserialize(BsonDocument bsonDocument) {
            return (String) Optional.ofNullable(bsonDocument).map(bsonDocument2 -> {
                return bsonDocument2.toJson(MongoConstants.DEFAULT_JSON_WRITER_SETTINGS);
            }).orElse(null);
        }

        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }

    MongoSourceITCase() {
    }

    @BeforeAll
    static void beforeAll() {
        mongoClient = MongoClients.create(MONGO_SHARDED_CONTAINER.getConnectionString());
        mongoClient.getDatabase(MongoTestUtil.CONFIG_DATABASE).getCollection(MongoTestUtil.SETTINGS_COLLECTION).withDocumentClass(BsonDocument.class).updateOne(Filters.eq("_id", MongoTestUtil.CHUNK_SIZE_FIELD), Updates.combine(new Bson[]{Updates.set("_id", MongoTestUtil.CHUNK_SIZE_FIELD), Updates.set(MongoTestUtil.VALUE_FIELD, 1)}), new UpdateOptions().upsert(true));
        initTestData(TEST_COLLECTION);
        initTestData(TEST_SHARDED_COLLECTION);
        initTestData(TEST_HASHED_KEY_SHARDED_COLLECTION);
        BsonDocument parse = BsonDocument.parse("{ f0: 1, f1: 1 }");
        MongoTestUtil.createIndex(mongoClient, TEST_DATABASE, TEST_SHARDED_COLLECTION, parse, new IndexOptions().unique(true));
        MongoTestUtil.shardCollection(mongoClient, TEST_DATABASE, TEST_SHARDED_COLLECTION, parse);
        BsonDocument parse2 = BsonDocument.parse("{ f1: 'hashed' }");
        MongoTestUtil.createIndex(mongoClient, TEST_DATABASE, TEST_HASHED_KEY_SHARDED_COLLECTION, parse2, new IndexOptions());
        MongoTestUtil.shardCollection(mongoClient, TEST_DATABASE, TEST_HASHED_KEY_SHARDED_COLLECTION, parse2);
    }

    @AfterAll
    static void afterAll() {
        if (mongoClient != null) {
            mongoClient.close();
        }
    }

    @MethodSource({"providePartitionStrategyAndCollection"})
    @ParameterizedTest
    void testPartitionStrategy(PartitionStrategy partitionStrategy, String str) throws Exception {
        Assertions.assertThat(CollectionUtil.iteratorToList(StreamExecutionEnvironment.getExecutionEnvironment().fromSource(defaultSourceBuilder(str).setPartitionSize(MemorySize.parse("1mb")).setSamplesPerPartition(3).setPartitionStrategy(partitionStrategy).setFilter(Filters.gt("f0", new BsonInt32(TEST_RECORD_BATCH_SIZE))).build(), WatermarkStrategy.noWatermarks(), "MongoDB-Source").executeAndCollect())).hasSize(20000);
    }

    @Test
    void testLimit() throws Exception {
        Assertions.assertThat(CollectionUtil.iteratorToList(StreamExecutionEnvironment.getExecutionEnvironment().fromSource(defaultSourceBuilder(TEST_COLLECTION).setLimit(100).setPartitionSize(MemorySize.parse("1mb")).build(), WatermarkStrategy.noWatermarks(), "MongoDB-Source").executeAndCollect())).hasSize(200);
    }

    @Test
    void testProject() throws Exception {
        List iteratorToList = CollectionUtil.iteratorToList(StreamExecutionEnvironment.getExecutionEnvironment().fromSource(MongoSource.builder().setUri(MONGO_SHARDED_CONTAINER.getConnectionString()).setDatabase(TEST_DATABASE).setCollection(TEST_COLLECTION).setProjectedFields(new String[]{"f0"}).setDeserializationSchema(new MongoJsonDeserializationSchema()).build(), WatermarkStrategy.noWatermarks(), "MongoDB-Source").executeAndCollect());
        Assertions.assertThat(iteratorToList).hasSize(TEST_RECORD_SIZE);
        Assertions.assertThat(Document.parse((String) iteratorToList.get(0))).containsOnlyKeys(new String[]{"f0"});
    }

    @MethodSource({"providePartitionStrategyAndCollection"})
    @ParameterizedTest
    void testRecovery(PartitionStrategy partitionStrategy, String str) throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(200L);
        Assertions.assertThat(CollectionUtil.iteratorToList(executionEnvironment.fromSource(defaultSourceBuilder(str).setPartitionStrategy(partitionStrategy).setPartitionSize(MemorySize.parse("6mb")).setProjectedFields(new String[]{"f0"}).setFetchSize(100).build(), WatermarkStrategy.noWatermarks(), "MongoDB-Source").map(new FailingMapper(this.sharedObjects.add(new AtomicBoolean(false)))).executeAndCollect())).hasSize(TEST_RECORD_SIZE);
    }

    private static Stream<Arguments> providePartitionStrategyAndCollection() {
        return Stream.of((Object[]) new Arguments[]{Arguments.of(new Object[]{PartitionStrategy.SINGLE, TEST_COLLECTION}), Arguments.of(new Object[]{PartitionStrategy.SPLIT_VECTOR, TEST_COLLECTION}), Arguments.of(new Object[]{PartitionStrategy.SAMPLE, TEST_COLLECTION}), Arguments.of(new Object[]{PartitionStrategy.SHARDED, TEST_SHARDED_COLLECTION}), Arguments.of(new Object[]{PartitionStrategy.SHARDED, TEST_HASHED_KEY_SHARDED_COLLECTION}), Arguments.of(new Object[]{PartitionStrategy.PAGINATION, TEST_COLLECTION})});
    }

    private static MongoSourceBuilder<RowData> defaultSourceBuilder(String str) {
        RowType logicalType = defaultSourceSchema().toPhysicalRowDataType().getLogicalType();
        return MongoSource.builder().setUri(MONGO_SHARDED_CONTAINER.getConnectionString()).setDatabase(TEST_DATABASE).setCollection(str).setDeserializationSchema(new MongoRowDataDeserializationSchema(logicalType, InternalTypeInfo.of(logicalType)));
    }

    private static ResolvedSchema defaultSourceSchema() {
        return ResolvedSchema.of(new Column[]{Column.physical("f0", DataTypes.INT()), Column.physical("f1", DataTypes.STRING())});
    }

    private static void initTestData(String str) {
        MongoCollection withDocumentClass = mongoClient.getDatabase(TEST_DATABASE).getCollection(str).withDocumentClass(BsonDocument.class);
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= TEST_RECORD_SIZE; i++) {
            arrayList.add(createTestData(i));
            if (arrayList.size() >= TEST_RECORD_BATCH_SIZE) {
                withDocumentClass.insertMany(arrayList);
                arrayList.clear();
            }
        }
    }

    private static BsonDocument createTestData(int i) {
        return new BsonDocument("f0", new BsonInt32(i)).append("f1", new BsonString(RandomStringUtils.randomAlphabetic(32)));
    }
}
