package org.apache.flink.connector.mongodb.sink;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.testutils.junit.SharedObjectsExtension;
import org.apache.flink.testutils.junit.SharedReference;
import org.assertj.core.api.Assertions;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/apache/flink/connector/mongodb/sink/MongoSinkITCase.class */
public class MongoSinkITCase {
    private static final Logger LOG = LoggerFactory.getLogger(MongoSinkITCase.class);

    @Container
    private static final MongoDBContainer MONGO_CONTAINER = MongoTestUtil.createMongoDBContainer(LOG);

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(1).build());

    @RegisterExtension
    final SharedObjectsExtension sharedObjects = SharedObjectsExtension.create();
    private static final String TEST_DATABASE = "test_sink";
    private static MongoClient mongoClient;

    /* loaded from: input_file:org/apache/flink/connector/mongodb/sink/MongoSinkITCase$FailingMapper.class */
    private static class FailingMapper implements MapFunction<Long, Long>, CheckpointListener {
        private final SharedReference<AtomicBoolean> failed;
        private int emittedRecords;

        private FailingMapper(SharedReference<AtomicBoolean> sharedReference) {
            this.emittedRecords = 0;
            this.failed = sharedReference;
        }

        public Long map(Long l) throws Exception {
            Thread.sleep(50L);
            this.emittedRecords++;
            return l;
        }

        public void notifyCheckpointComplete(long j) throws Exception {
            if (((AtomicBoolean) this.failed.get()).get() || this.emittedRecords == 0) {
                return;
            }
            ((AtomicBoolean) this.failed.get()).set(true);
            throw new Exception("Expected failure");
        }
    }

    /* loaded from: input_file:org/apache/flink/connector/mongodb/sink/MongoSinkITCase$TestMapFunction.class */
    private static class TestMapFunction implements MapFunction<Long, Document> {
        private TestMapFunction() {
        }

        public Document map(Long l) {
            return MongoSinkITCase.buildMessage(l.intValue());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/mongodb/sink/MongoSinkITCase$UpsertSerializationSchema.class */
    public static class UpsertSerializationSchema implements MongoSerializationSchema<Document> {
        private UpsertSerializationSchema() {
        }

        public WriteModel<BsonDocument> serialize(Document document, MongoSinkContext mongoSinkContext) {
            BsonDocument bsonDocument = document.toBsonDocument();
            BsonDocument bsonDocument2 = new BsonDocument("_id", bsonDocument.get("_id"));
            bsonDocument.remove("_id");
            return new UpdateOneModel(bsonDocument2, new BsonDocument("$set", bsonDocument), new UpdateOptions().upsert(true));
        }
    }

    @BeforeAll
    static void setUp() {
        mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
    }

    @AfterAll
    static void tearDown() {
        if (mongoClient != null) {
            mongoClient.close();
        }
    }

    @EnumSource(value = DeliveryGuarantee.class, mode = EnumSource.Mode.EXCLUDE, names = {"EXACTLY_ONCE"})
    @ParameterizedTest
    void testWriteToMongoWithDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) throws Exception {
        String str = "test-sink-with-delivery-" + deliveryGuarantee;
        MongoSink<Document> createSink = createSink(str, deliveryGuarantee);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.fromSequence(1L, 5L).map(new TestMapFunction()).sinkTo(createSink);
        executionEnvironment.execute();
        MongoTestUtil.assertThatIdsAreWritten(collectionOf(str), 1, 2, 3, 4, 5);
    }

    @Test
    void testRecovery() throws Exception {
        MongoSink<Document> createSink = createSink("test-recovery-mongo-sink", DeliveryGuarantee.AT_LEAST_ONCE);
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        SharedReference add = this.sharedObjects.add(new AtomicBoolean(false));
        executionEnvironment.fromSequence(1L, 5L).map(new FailingMapper(add)).map(new TestMapFunction()).sinkTo(createSink);
        executionEnvironment.execute();
        MongoTestUtil.assertThatIdsAreWritten(collectionOf("test-recovery-mongo-sink"), 1, 2, 3, 4, 5);
        Assertions.assertThat((AtomicBoolean) add.get()).isTrue();
    }

    private static MongoSink<Document> createSink(String str, DeliveryGuarantee deliveryGuarantee) {
        return MongoSink.builder().setUri(MONGO_CONTAINER.getConnectionString()).setDatabase(TEST_DATABASE).setCollection(str).setBatchSize(5).setDeliveryGuarantee(deliveryGuarantee).setSerializationSchema(new UpsertSerializationSchema()).build();
    }

    private static MongoCollection<Document> collectionOf(String str) {
        return mongoClient.getDatabase(TEST_DATABASE).getCollection(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Document buildMessage(int i) {
        return new Document("_id", Integer.valueOf(i)).append("f1", "d_" + i);
    }
}
