package org.apache.flink.connector.mongodb.sink.writer;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.DeleteOneModel;
import com.mongodb.client.model.InsertOneModel;
import com.mongodb.client.model.UpdateOneModel;
import com.mongodb.client.model.UpdateOptions;
import com.mongodb.client.model.WriteModel;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Optional;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
import org.apache.flink.connector.mongodb.sink.MongoSink;
import org.apache.flink.connector.mongodb.sink.config.MongoWriteOptions;
import org.apache.flink.connector.mongodb.sink.writer.context.MongoSinkContext;
import org.apache.flink.connector.mongodb.sink.writer.serializer.MongoSerializationSchema;
import org.apache.flink.connector.mongodb.testutils.MongoTestUtil;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.junit5.MiniClusterExtension;
import org.assertj.core.api.Assertions;
import org.bson.BsonDocument;
import org.bson.Document;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.MongoDBContainer;
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;

@Testcontainers
/* loaded from: input_file:org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase.class */
public class MongoWriterITCase {
    private static final String TEST_DATABASE = "test_writer";
    private static MongoClient mongoClient;
    private static TestSinkInitContext sinkInitContext;
    private static final Logger LOG = LoggerFactory.getLogger(MongoWriterITCase.class);

    @RegisterExtension
    static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension(new MiniClusterResourceConfiguration.Builder().setNumberTaskManagers(2).build());

    @Container
    private static final MongoDBContainer MONGO_CONTAINER = MongoTestUtil.createMongoDBContainer(LOG);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase$UpsertSerializationSchema.class */
    public static class UpsertSerializationSchema implements MongoSerializationSchema<Document> {
        private UpsertSerializationSchema() {
        }

        public WriteModel<BsonDocument> serialize(Document document, MongoSinkContext mongoSinkContext) {
            String string = document.getString("op");
            boolean z = -1;
            switch (string.hashCode()) {
                case 100:
                    if (string.equals("d")) {
                        z = 2;
                        break;
                    }
                    break;
                case 105:
                    if (string.equals("i")) {
                        z = false;
                        break;
                    }
                    break;
                case 117:
                    if (string.equals("u")) {
                        z = true;
                        break;
                    }
                    break;
            }
            switch (z) {
                case false:
                    return new InsertOneModel(document.toBsonDocument());
                case true:
                    BsonDocument bsonDocument = document.toBsonDocument();
                    BsonDocument bsonDocument2 = new BsonDocument("_id", bsonDocument.getInt32("_id"));
                    bsonDocument.remove("_id");
                    return new UpdateOneModel(bsonDocument2, new BsonDocument("$set", bsonDocument), new UpdateOptions().upsert(true));
                case true:
                    return new DeleteOneModel(new BsonDocument("_id", document.toBsonDocument().getInt32("_id")));
                default:
                    throw new UnsupportedOperationException("op is not supported " + string);
            }
        }
    }

    @BeforeAll
    static void beforeAll() {
        mongoClient = MongoClients.create(MONGO_CONTAINER.getConnectionString());
    }

    @AfterAll
    static void afterAll() {
        if (mongoClient != null) {
            mongoClient.close();
        }
    }

    @BeforeEach
    void setUp() {
        sinkInitContext = new TestSinkInitContext();
    }

    @Test
    void testWriteOnBulkFlush() throws Exception {
        MongoWriter<Document> createWriter = createWriter("test-bulk-flush-without-checkpoint", 5, -1L, false);
        Throwable th = null;
        try {
            try {
                createWriter.write(buildMessage(1), (SinkWriter.Context) null);
                createWriter.write(buildMessage(2), (SinkWriter.Context) null);
                createWriter.write(buildMessage(3), (SinkWriter.Context) null);
                createWriter.write(buildMessage(4), (SinkWriter.Context) null);
                createWriter.flush(false);
                MongoTestUtil.assertThatIdsAreNotWritten(collectionOf("test-bulk-flush-without-checkpoint"), 1, 2, 3, 4);
                createWriter.write(buildMessage(5), (SinkWriter.Context) null);
                MongoTestUtil.assertThatIdsAreWritten(collectionOf("test-bulk-flush-without-checkpoint"), 1, 2, 3, 4, 5);
                createWriter.write(buildMessage(6), (SinkWriter.Context) null);
                MongoTestUtil.assertThatIdsAreNotWritten(collectionOf("test-bulk-flush-without-checkpoint"), 6);
                createWriter.doBulkWrite();
                MongoTestUtil.assertThatIdsAreWritten(collectionOf("test-bulk-flush-without-checkpoint"), 1, 2, 3, 4, 5, 6);
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testWriteOnBatchIntervalFlush() throws Exception {
        MongoWriter<Document> createWriter = createWriter("test-bulk-flush-with-interval", -1, 1000L, false);
        Throwable th = null;
        try {
            createWriter.write(buildMessage(1), (SinkWriter.Context) null);
            createWriter.write(buildMessage(2), (SinkWriter.Context) null);
            createWriter.doBulkWrite();
            createWriter.write(buildMessage(3), (SinkWriter.Context) null);
            createWriter.write(buildMessage(4), (SinkWriter.Context) null);
            MongoTestUtil.assertThatIdsAreWrittenWithMaxWaitTime(collectionOf("test-bulk-flush-with-interval"), 10000L, 1, 2, 3, 4);
            if (createWriter != null) {
                if (0 == 0) {
                    createWriter.close();
                    return;
                }
                try {
                    createWriter.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (createWriter != null) {
                if (0 != 0) {
                    try {
                        createWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th3;
        }
    }

    @Test
    void testWriteOnCheckpoint() throws Exception {
        MongoWriter<Document> createWriter = createWriter("test-bulk-flush-with-checkpoint", -1, -1L, true);
        Throwable th = null;
        try {
            try {
                createWriter.write(buildMessage(1), (SinkWriter.Context) null);
                createWriter.write(buildMessage(2), (SinkWriter.Context) null);
                createWriter.write(buildMessage(3), (SinkWriter.Context) null);
                MongoTestUtil.assertThatIdsAreNotWritten(collectionOf("test-bulk-flush-with-checkpoint"), 1, 2, 3);
                createWriter.flush(false);
                MongoTestUtil.assertThatIdsAreWritten(collectionOf("test-bulk-flush-with-checkpoint"), 1, 2, 3);
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testIncrementRecordsSendMetric() throws Exception {
        MongoWriter<Document> createWriter = createWriter("test-inc-records-send", 2, -1L, false);
        Throwable th = null;
        try {
            try {
                Counter numRecordsOutCounter = sinkInitContext.getNumRecordsOutCounter();
                createWriter.write(buildMessage(1), (SinkWriter.Context) null);
                createWriter.write(buildMessage(2, "u"), (SinkWriter.Context) null);
                createWriter.write(buildMessage(3, "d"), (SinkWriter.Context) null);
                createWriter.doBulkWrite();
                Assertions.assertThat(numRecordsOutCounter.getCount()).isEqualTo(3L);
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    @Test
    void testCurrentSendTime() throws Exception {
        MongoWriter<Document> createWriter = createWriter("test-current-send-time", 1, -1L, false);
        Throwable th = null;
        try {
            try {
                Optional currentSendTimeGauge = sinkInitContext.getCurrentSendTimeGauge();
                Assertions.assertThat(currentSendTimeGauge.isPresent()).isTrue();
                Assertions.assertThat((Long) ((Gauge) currentSendTimeGauge.get()).getValue()).isEqualTo(Long.MAX_VALUE);
                for (int i = 0; i < 5; i++) {
                    createWriter.write(buildMessage(i), (SinkWriter.Context) null);
                    createWriter.doBulkWrite();
                    if (((Long) ((Gauge) currentSendTimeGauge.get()).getValue()).longValue() > 0) {
                        if (createWriter != null) {
                            if (0 == 0) {
                                createWriter.close();
                                return;
                            }
                            try {
                                createWriter.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    }
                }
                Assertions.fail("Test currentSendTime should be larger than 0 failed over max retry times.");
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            } catch (Throwable th4) {
                th = th4;
                throw th4;
            }
        } catch (Throwable th5) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th5;
        }
    }

    @Test
    void testSinkContext() throws Exception {
        MongoWriteOptions build = MongoWriteOptions.builder().setBatchSize(2).setBatchIntervalMs(-1L).setDeliveryGuarantee(DeliveryGuarantee.NONE).build();
        MongoWriter<Document> createWriter = createWriter("test-sink-context", 2, -1L, false, (document, mongoSinkContext) -> {
            Assertions.assertThat(mongoSinkContext.getInitContext().getSubtaskId()).isEqualTo(0);
            Assertions.assertThat(mongoSinkContext.getWriteOptions()).isEqualTo(build);
            Assertions.assertThat(mongoSinkContext.processTime()).isEqualTo(sinkInitContext.getProcessingTimeService().getCurrentProcessingTime());
            return new InsertOneModel(document.toBsonDocument());
        });
        Throwable th = null;
        try {
            try {
                createWriter.write(buildMessage(1), (SinkWriter.Context) null);
                createWriter.write(buildMessage(2), (SinkWriter.Context) null);
                createWriter.doBulkWrite();
                if (createWriter != null) {
                    if (0 == 0) {
                        createWriter.close();
                        return;
                    }
                    try {
                        createWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createWriter != null) {
                if (th != null) {
                    try {
                        createWriter.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createWriter.close();
                }
            }
            throw th4;
        }
    }

    private static MongoCollection<Document> collectionOf(String str) {
        return mongoClient.getDatabase(TEST_DATABASE).getCollection(str);
    }

    private static MongoWriter<Document> createWriter(String str, int i, long j, boolean z) throws IOException {
        return createWriter(str, i, j, z, new UpsertSerializationSchema());
    }

    private static MongoWriter<Document> createWriter(String str, int i, long j, boolean z, MongoSerializationSchema<Document> mongoSerializationSchema) throws IOException {
        return MongoSink.builder().setUri(MONGO_CONTAINER.getConnectionString()).setDatabase(TEST_DATABASE).setCollection(str).setBatchSize(i).setBatchIntervalMs(j).setDeliveryGuarantee(z ? DeliveryGuarantee.AT_LEAST_ONCE : DeliveryGuarantee.NONE).setSerializationSchema(mongoSerializationSchema).build().createWriter(sinkInitContext);
    }

    private static Document buildMessage(int i) {
        return buildMessage(i, "i");
    }

    private static Document buildMessage(int i, String str) {
        return new Document("_id", Integer.valueOf(i)).append("op", str);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1604886806:
                if (implMethodName.equals("lambda$testSinkContext$5fa0a135$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/connector/mongodb/sink/writer/serializer/MongoSerializationSchema") && serializedLambda.getFunctionalInterfaceMethodName().equals("serialize") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Lorg/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext;)Lcom/mongodb/client/model/WriteModel;") && serializedLambda.getImplClass().equals("org/apache/flink/connector/mongodb/sink/writer/MongoWriterITCase") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/flink/connector/mongodb/sink/config/MongoWriteOptions;Lorg/bson/Document;Lorg/apache/flink/connector/mongodb/sink/writer/context/MongoSinkContext;)Lcom/mongodb/client/model/WriteModel;")) {
                    MongoWriteOptions mongoWriteOptions = (MongoWriteOptions) serializedLambda.getCapturedArg(0);
                    return (document, mongoSinkContext) -> {
                        Assertions.assertThat(mongoSinkContext.getInitContext().getSubtaskId()).isEqualTo(0);
                        Assertions.assertThat(mongoSinkContext.getWriteOptions()).isEqualTo(mongoWriteOptions);
                        Assertions.assertThat(mongoSinkContext.processTime()).isEqualTo(sinkInitContext.getProcessingTimeService().getCurrentProcessingTime());
                        return new InsertOneModel(document.toBsonDocument());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
