package org.apache.beam.sdk.io.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.MongoCmdOptions;
import de.flapdoodle.embed.mongo.config.MongodConfig;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbIOTest.class */
public class MongoDbIOTest {
    private static final String DATABASE_NAME = "beam";
    private static final String COLLECTION_NAME = "test";
    private static final String VIEW_NAME = "test_view";
    private static MongodExecutable mongodExecutable;
    private static MongodProcess mongodProcess;
    private static MongoClient client;
    private static MongoDatabase database;
    private static int port;

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbIOTest.class);

    @ClassRule
    public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder();
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();

    /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbIOTest$DocumentToKVFn.class */
    static class DocumentToKVFn extends SimpleFunction<Document, KV<String, Void>> {
        DocumentToKVFn() {
        }

        public KV<String, Void> apply(Document document) {
            return KV.of(document.getString("scientist"), (Object) null);
        }
    }

    @BeforeClass
    public static void beforeClass() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting MongoDB embedded instance on {}", Integer.valueOf(port));
        mongodExecutable = mongodStarter.prepare(MongodConfig.builder().version(Version.Main.PRODUCTION).isConfigServer(false).replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), (String) null, 0)).net(new Net("localhost", port, Network.localhostIsIPv6())).cmdOptions(MongoCmdOptions.builder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).isVerbose(false).build()).build());
        mongodProcess = mongodExecutable.start();
        client = new MongoClient("localhost", port);
        database = client.getDatabase(DATABASE_NAME);
        LOG.info("Insert test data");
        database.getCollection(COLLECTION_NAME).insertMany(createDocuments(1000, false));
        database.createView(VIEW_NAME, COLLECTION_NAME, Collections.emptyList());
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MongoDB instance");
        client.close();
        mongodProcess.stop();
        mongodExecutable.stop();
    }

    @Test
    public void testSplitIntoFilters() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Document("_id", 56));
        List splitKeysToFilters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(arrayList);
        Assert.assertEquals(2L, splitKeysToFilters.size());
        Assert.assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", splitKeysToFilters.get(0));
        Assert.assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"56\")}} ]}", splitKeysToFilters.get(1));
        arrayList.add(new Document("_id", 109));
        arrayList.add(new Document("_id", 256));
        List splitKeysToFilters2 = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(arrayList);
        Assert.assertEquals(4L, splitKeysToFilters2.size());
        Assert.assertEquals("{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", splitKeysToFilters2.get(0));
        Assert.assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}", splitKeysToFilters2.get(1));
        Assert.assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}", splitKeysToFilters2.get(2));
        Assert.assertEquals("{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", splitKeysToFilters2.get(3));
    }

    @Test
    public void testSplitIntoBucket() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new Document("_id", new ObjectId("52cc8f6254c5317943000005")));
        List splitKeysToMatch = MongoDbIO.BoundedMongoDbSource.splitKeysToMatch(arrayList);
        Assert.assertEquals(2L, splitKeysToMatch.size());
        Assert.assertEquals("{\"$match\": {\"_id\": {\"$lte\": {\"$oid\": \"52cc8f6254c5317943000005\"}}}}", ((BsonDocument) splitKeysToMatch.get(0)).toString());
        Assert.assertEquals("{\"$match\": {\"_id\": {\"$gt\": {\"$oid\": \"52cc8f6254c5317943000005\"}}}}", ((BsonDocument) splitKeysToMatch.get(1)).toString());
        arrayList.add(new Document("_id", new ObjectId("52cc8f6254c5317943000007")));
        arrayList.add(new Document("_id", new ObjectId("54242e9e54c531ef8800001f")));
        List splitKeysToMatch2 = MongoDbIO.BoundedMongoDbSource.splitKeysToMatch(arrayList);
        Assert.assertEquals(4L, splitKeysToMatch2.size());
        Assert.assertEquals("{\"$match\": {\"_id\": {\"$lte\": {\"$oid\": \"52cc8f6254c5317943000005\"}}}}", ((BsonDocument) splitKeysToMatch2.get(0)).toString());
        Assert.assertEquals("{\"$match\": {\"_id\": {\"$gt\": {\"$oid\": \"52cc8f6254c5317943000005\"}, \"$lte\": {\"$oid\": \"52cc8f6254c5317943000007\"}}}}", ((BsonDocument) splitKeysToMatch2.get(1)).toString());
        Assert.assertEquals("{\"$match\": {\"_id\": {\"$gt\": {\"$oid\": \"52cc8f6254c5317943000007\"}, \"$lte\": {\"$oid\": \"54242e9e54c531ef8800001f\"}}}}", ((BsonDocument) splitKeysToMatch2.get(2)).toString());
        Assert.assertEquals("{\"$match\": {\"_id\": {\"$gt\": {\"$oid\": \"54242e9e54c531ef8800001f\"}}}}", ((BsonDocument) splitKeysToMatch2.get(3)).toString());
    }

    @Test
    public void testBuildAutoBuckets() {
        new ArrayList().add(new BsonDocument("$match", new BsonDocument("country", new BsonDocument("$eq", new BsonString("England")))));
        Assert.assertEquals(10L, MongoDbIO.BoundedMongoDbSource.buildAutoBuckets(database, MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME).withQueryFn(AggregationQuery.create().withMongoDbPipeline(r0))).size());
    }

    @Test
    public void testFullRead() {
        PCollection apply = this.pipeline.apply(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(1000L);
        PAssert.that(apply.apply("Map Scientist", MapElements.via(new DocumentToKVFn())).apply("Count Scientist", Count.perKey())).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(100L, ((Long) ((KV) it.next()).getValue()).longValue());
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testGetSizeCollection() {
        MatcherAssert.assertThat(Long.valueOf(new MongoDbIO.BoundedMongoDbSource(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME)).getEstimatedSizeBytes(this.pipeline.getOptions())), Matchers.greaterThan(0L));
    }

    @Test
    public void testGetSizeView() {
        Assert.assertEquals(0L, new MongoDbIO.BoundedMongoDbSource(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(VIEW_NAME)).getEstimatedSizeBytes(this.pipeline.getOptions()));
    }

    @Test
    public void testReadWithCustomConnectionOptions() {
        MongoDbIO.Read withCollection = MongoDbIO.read().withUri("mongodb://localhost:" + port).withMaxConnectionIdleTime(10).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME);
        Assert.assertEquals(10L, withCollection.maxConnectionIdleTime());
        PCollection apply = this.pipeline.apply(withCollection);
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(1000L);
        PAssert.that(apply.apply("Map Scientist", MapElements.via(new DocumentToKVFn())).apply("Count Scientist", Count.perKey())).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(100L, ((Long) ((KV) it.next()).getValue()).longValue());
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithFilter() {
        PAssert.thatSingleton(this.pipeline.apply(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME).withQueryFn(FindQuery.create().withFilters(Filters.eq("scientist", "Einstein")))).apply("Count", Count.globally())).isEqualTo(100L);
        this.pipeline.run();
    }

    @Test
    public void testReadWithFilterAndLimit() throws Exception {
        PAssert.thatSingleton(this.pipeline.apply(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME).withNumSplits(10).withQueryFn(FindQuery.create().withFilters(Filters.eq("scientist", "Einstein")).withLimit(5))).apply("Count", Count.globally())).isEqualTo(5L);
        this.pipeline.run();
    }

    @Test
    public void testReadWithAggregate() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new BsonDocument("$match", new BsonDocument("country", new BsonDocument("$eq", new BsonString("England")))));
        PAssert.thatSingleton(this.pipeline.apply(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME).withQueryFn(AggregationQuery.create().withMongoDbPipeline(arrayList))).apply("Count", Count.globally())).isEqualTo(300L);
        this.pipeline.run();
    }

    @Test
    public void testReadWithAggregateWithLimit() throws Exception {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new BsonDocument("$match", new BsonDocument("country", new BsonDocument("$eq", new BsonString("England")))));
        arrayList.add(new BsonDocument("$limit", new BsonInt32(10)));
        PAssert.thatSingleton(this.pipeline.apply(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection(COLLECTION_NAME).withQueryFn(AggregationQuery.create().withMongoDbPipeline(arrayList))).apply("Count", Count.globally())).isEqualTo(10L);
        this.pipeline.run();
    }

    @Test
    public void testWrite() {
        this.pipeline.apply(Create.of(createDocuments(1000, false))).apply(MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection("testWrite"));
        this.pipeline.run();
        Assert.assertEquals(1000L, countElements("testWrite"));
    }

    @Test
    public void testWriteUnordered() {
        Document parse = Document.parse("{\"_id\":\"521df3a4300466f1f2b5ae82\",\"scientist\":\"Test %s\"}");
        this.pipeline.apply(Create.of(parse, new Document[]{parse})).apply(MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withOrdered(false).withCollection("testWriteUnordered"));
        this.pipeline.run();
        Assert.assertEquals(1L, countElements("testWriteUnordered"));
    }

    @Test
    public void testUpdate() {
        Document parse = Document.parse("{\"id\":1,\"scientist\":\"Updated\",\"country\":\"India\"}");
        database.getCollection("testUpdate").insertMany(createDocuments(100, true));
        Assert.assertEquals(100L, countElements("testUpdate"));
        ArrayList arrayList = new ArrayList();
        arrayList.add(parse);
        this.pipeline.apply(Create.of(arrayList)).apply(MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase(DATABASE_NAME).withCollection("testUpdate").withUpdateConfiguration(UpdateConfiguration.create().withUpdateKey("id").withUpdateFields(new UpdateField[]{UpdateField.fieldUpdate("$set", "scientist", "scientist"), UpdateField.fieldUpdate("$set", "country", "country")})));
        this.pipeline.run();
        Document document = (Document) database.getCollection("testUpdate").find(new Document("_id", 1)).first();
        Assert.assertEquals("Updated", document.get("scientist"));
        Assert.assertEquals("India", document.get("country"));
    }

    private static List<Document> createDocuments(int i, boolean z) {
        String[] strArr = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        String[] strArr2 = {"Germany", "England", "Poland", "France", "France", "England", "England", "Denmark", "Florence", "Scotland"};
        ArrayList arrayList = new ArrayList();
        for (int i2 = 1; i2 <= i; i2++) {
            int length = i2 % strArr.length;
            Document document = new Document();
            if (z) {
                document.append("_id", Integer.valueOf(i2));
            }
            document.append("scientist", strArr[length]);
            document.append("country", strArr2[length]);
            arrayList.add(document);
        }
        return arrayList;
    }

    private static int countElements(String str) {
        return Iterators.size(database.getCollection(str).find().iterator());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1355795276:
                if (implMethodName.equals("lambda$testReadWithCustomConnectionOptions$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
            case 1940887068:
                if (implMethodName.equals("lambda$testFullRead$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/mongodb/MongoDbIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            Assert.assertEquals(100L, ((Long) ((KV) it.next()).getValue()).longValue());
                        }
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/mongodb/MongoDbIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable2 -> {
                        Iterator it = iterable2.iterator();
                        while (it.hasNext()) {
                            Assert.assertEquals(100L, ((Long) ((KV) it.next()).getValue()).longValue());
                        }
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
