/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.IMongodConfig;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.IFeatureAwareVersion;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.config.IExecutableProcessConfig;
import de.flapdoodle.embed.process.runtime.Network;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.mongodb.AggregationQuery;
import org.apache.beam.sdk.io.mongodb.FindQuery;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterators;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class MongoDbIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbIOTest.class);
    @ClassRule
    public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder();
    private static final String DATABASE = "beam";
    private static final String COLLECTION = "test";
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();
    private static MongodExecutable mongodExecutable;
    private static MongodProcess mongodProcess;
    private static MongoClient client;
    private static int port;
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();

    @BeforeClass
    public static void beforeClass() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting MongoDB embedded instance on {}", (Object)port);
        IMongodConfig mongodConfig = new MongodConfigBuilder().version((IFeatureAwareVersion)Version.Main.PRODUCTION).configServer(false).replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), null, 0)).net(new Net("localhost", port, Network.localhostIsIPv6())).cmdOptions(new MongoCmdOptionsBuilder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).verbose(false).build()).build();
        mongodExecutable = (MongodExecutable)mongodStarter.prepare((IExecutableProcessConfig)mongodConfig);
        mongodProcess = (MongodProcess)mongodExecutable.start();
        client = new MongoClient("localhost", port);
        LOG.info("Insert test data");
        List<Document> documents = MongoDbIOTest.createDocuments(1000);
        MongoCollection<Document> collection = MongoDbIOTest.getCollection(COLLECTION);
        collection.insertMany(documents);
    }

    @AfterClass
    public static void afterClass() {
        LOG.info("Stopping MongoDB instance");
        client.close();
        mongodProcess.stop();
        mongodExecutable.stop();
    }

    @Test
    public void testSplitIntoFilters() {
        ArrayList<Document> documents = new ArrayList<Document>();
        documents.add(new Document("_id", (Object)56));
        List filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents);
        Assert.assertEquals((long)2L, (long)filters.size());
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0));
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$gt:ObjectId(\"56\")}} ]}", filters.get(1));
        documents.add(new Document("_id", (Object)109));
        documents.add(new Document("_id", (Object)256));
        filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents);
        Assert.assertEquals((long)4L, (long)filters.size());
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0));
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}", filters.get(1));
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}", filters.get(2));
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", filters.get(3));
    }

    @Test
    public void testSplitIntoBucket() {
        ArrayList<Document> documents = new ArrayList<Document>();
        documents.add(new Document("_id", (Object)new ObjectId("52cc8f6254c5317943000005")));
        List buckets = MongoDbIO.BoundedMongoDbSource.splitKeysToMatch(documents);
        Assert.assertEquals((long)2L, (long)buckets.size());
        Assert.assertEquals((Object)"{ \"$match\" : { \"_id\" : { \"$lte\" : { \"$oid\" : \"52cc8f6254c5317943000005\" } } } }", (Object)((BsonDocument)buckets.get(0)).toString());
        Assert.assertEquals((Object)"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"52cc8f6254c5317943000005\" } } } }", (Object)((BsonDocument)buckets.get(1)).toString());
        documents.add(new Document("_id", (Object)new ObjectId("52cc8f6254c5317943000007")));
        documents.add(new Document("_id", (Object)new ObjectId("54242e9e54c531ef8800001f")));
        buckets = MongoDbIO.BoundedMongoDbSource.splitKeysToMatch(documents);
        Assert.assertEquals((long)4L, (long)buckets.size());
        Assert.assertEquals((Object)"{ \"$match\" : { \"_id\" : { \"$lte\" : { \"$oid\" : \"52cc8f6254c5317943000005\" } } } }", (Object)((BsonDocument)buckets.get(0)).toString());
        Assert.assertEquals((Object)"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"52cc8f6254c5317943000005\" }, \"$lte\" : { \"$oid\" : \"52cc8f6254c5317943000007\" } } } }", (Object)((BsonDocument)buckets.get(1)).toString());
        Assert.assertEquals((Object)"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"52cc8f6254c5317943000007\" }, \"$lte\" : { \"$oid\" : \"54242e9e54c531ef8800001f\" } } } }", (Object)((BsonDocument)buckets.get(2)).toString());
        Assert.assertEquals((Object)"{ \"$match\" : { \"_id\" : { \"$gt\" : { \"$oid\" : \"54242e9e54c531ef8800001f\" } } } }", (Object)((BsonDocument)buckets.get(3)).toString());
    }

    @Test
    public void testBuildAutoBuckets() {
        ArrayList<BsonDocument> aggregates = new ArrayList<BsonDocument>();
        aggregates.add(new BsonDocument("$match", (BsonValue)new BsonDocument("country", (BsonValue)new BsonDocument("$eq", (BsonValue)new BsonString("England")))));
        MongoDbIO.Read spec = MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION).withQueryFn((SerializableFunction)AggregationQuery.create().withMongoDbPipeline(aggregates));
        MongoDatabase database = client.getDatabase(DATABASE);
        List buckets = MongoDbIO.BoundedMongoDbSource.buildAutoBuckets((MongoDatabase)database, (MongoDbIO.Read)spec);
        Assert.assertEquals((long)10L, (long)buckets.size());
    }

    @Test
    public void testFullRead() {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)1000L);
        PAssert.that((PCollection)((PCollection)((PCollection)output.apply("Map Scientist", (PTransform)MapElements.via((SimpleFunction)new DocumentToKVFn()))).apply("Count Scientist", Count.perKey()))).satisfies((SerializableFunction & Serializable)input -> {
            for (KV element : input) {
                Assert.assertEquals((long)100L, (long)((Long)element.getValue()));
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithCustomConnectionOptions() {
        MongoDbIO.Read read = MongoDbIO.read().withUri("mongodb://localhost:" + port).withMaxConnectionIdleTime(10).withDatabase(DATABASE).withCollection(COLLECTION);
        Assert.assertEquals((long)10L, (long)read.maxConnectionIdleTime());
        PCollection documents = (PCollection)this.pipeline.apply((PTransform)read);
        PAssert.thatSingleton((PCollection)((PCollection)documents.apply("Count All", Count.globally()))).isEqualTo((Object)1000L);
        PAssert.that((PCollection)((PCollection)((PCollection)documents.apply("Map Scientist", (PTransform)MapElements.via((SimpleFunction)new DocumentToKVFn()))).apply("Count Scientist", Count.perKey()))).satisfies((SerializableFunction & Serializable)input -> {
            for (KV element : input) {
                Assert.assertEquals((long)100L, (long)((Long)element.getValue()));
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithFilter() {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION).withQueryFn((SerializableFunction)FindQuery.create().withFilters(Filters.eq((String)"scientist", (Object)"Einstein"))));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)100L);
        this.pipeline.run();
    }

    @Test
    public void testReadWithFilterAndLimit() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION).withNumSplits(10).withQueryFn((SerializableFunction)FindQuery.create().withFilters(Filters.eq((String)"scientist", (Object)"Einstein")).withLimit(5)));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)5L);
        this.pipeline.run();
    }

    @Test
    public void testReadWithAggregate() throws Exception {
        ArrayList<BsonDocument> aggregates = new ArrayList<BsonDocument>();
        aggregates.add(new BsonDocument("$match", (BsonValue)new BsonDocument("country", (BsonValue)new BsonDocument("$eq", (BsonValue)new BsonString("England")))));
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION).withQueryFn((SerializableFunction)AggregationQuery.create().withMongoDbPipeline(aggregates)));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)300L);
        this.pipeline.run();
    }

    @Test
    public void testReadWithAggregateWithLimit() throws Exception {
        ArrayList<BsonDocument> aggregates = new ArrayList<BsonDocument>();
        aggregates.add(new BsonDocument("$match", (BsonValue)new BsonDocument("country", (BsonValue)new BsonDocument("$eq", (BsonValue)new BsonString("England")))));
        aggregates.add(new BsonDocument("$limit", (BsonValue)new BsonInt32(10)));
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION).withQueryFn((SerializableFunction)AggregationQuery.create().withMongoDbPipeline(aggregates)));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)10L);
        this.pipeline.run();
    }

    @Test
    public void testWrite() {
        String collectionName = "testWrite";
        int numElements = 1000;
        ((PCollection)this.pipeline.apply((PTransform)Create.of(MongoDbIOTest.createDocuments(1000)))).apply((PTransform)MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection("testWrite"));
        this.pipeline.run();
        Assert.assertEquals((long)1000L, (long)MongoDbIOTest.countElements("testWrite"));
    }

    @Test
    public void testWriteUnordered() {
        String collectionName = "testWriteUnordered";
        Document doc = Document.parse((String)"{\"_id\":\"521df3a4300466f1f2b5ae82\",\"scientist\":\"Test %s\"}");
        ((PCollection)this.pipeline.apply((PTransform)Create.of((Object)doc, (Object[])new Document[]{doc}))).apply((PTransform)MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withOrdered(false).withCollection("testWriteUnordered"));
        this.pipeline.run();
        Assert.assertEquals((long)1L, (long)MongoDbIOTest.countElements("testWriteUnordered"));
    }

    private static List<Document> createDocuments(int n) {
        String[] scientists = new String[]{"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        String[] country = new String[]{"Germany", "England", "Poland", "France", "France", "England", "England", "Denmark", "Florence", "Scotland"};
        ArrayList<Document> documents = new ArrayList<Document>();
        for (int i = 1; i <= n; ++i) {
            int index = i % scientists.length;
            Document document = new Document();
            document.append("scientist", (Object)scientists[index]);
            document.append("country", (Object)country[index]);
            documents.add(document);
        }
        return documents;
    }

    private static int countElements(String collectionName) {
        return Iterators.size((Iterator)MongoDbIOTest.getCollection(collectionName).find().iterator());
    }

    private static MongoCollection<Document> getCollection(String collectionName) {
        MongoDatabase database = client.getDatabase(DATABASE);
        return database.getCollection(collectionName);
    }

    static class DocumentToKVFn
    extends SimpleFunction<Document, KV<String, Void>> {
        DocumentToKVFn() {
        }

        public KV<String, Void> apply(Document input) {
            return KV.of((Object)input.getString((Object)"scientist"), null);
        }
    }
}

