/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.IMongodConfig;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.IFeatureAwareVersion;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.config.IExecutableProcessConfig;
import de.flapdoodle.embed.process.io.file.Files;
import de.flapdoodle.embed.process.runtime.Network;
import java.io.File;
import java.io.Serializable;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MongoDbIOTest
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbIOTest.class);
    private static final String MONGODB_LOCATION = "target/mongodb";
    private static final String DATABASE = "beam";
    private static final String COLLECTION = "test";
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();
    private transient MongodExecutable mongodExecutable;
    private transient MongodProcess mongodProcess;
    private static int port;
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @BeforeClass
    public static void availablePort() throws Exception {
        try (ServerSocket serverSocket = new ServerSocket(0);){
            port = serverSocket.getLocalPort();
        }
    }

    @Before
    public void setup() throws Exception {
        LOG.info("Starting MongoDB embedded instance on {}", (Object)port);
        try {
            Files.forceDelete((File)new File(MONGODB_LOCATION));
        }
        catch (Exception exception) {
            // empty catch block
        }
        new File(MONGODB_LOCATION).mkdirs();
        IMongodConfig mongodConfig = new MongodConfigBuilder().version((IFeatureAwareVersion)Version.Main.PRODUCTION).configServer(false).replication(new Storage(MONGODB_LOCATION, null, 0)).net(new Net("localhost", port, Network.localhostIsIPv6())).cmdOptions(new MongoCmdOptionsBuilder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).build()).build();
        this.mongodExecutable = (MongodExecutable)mongodStarter.prepare((IExecutableProcessConfig)mongodConfig);
        this.mongodProcess = (MongodProcess)this.mongodExecutable.start();
        LOG.info("Insert test data");
        MongoClient client = new MongoClient("localhost", port);
        MongoDatabase database = client.getDatabase(DATABASE);
        MongoCollection collection = database.getCollection(COLLECTION);
        String[] scientists = new String[]{"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        for (int i = 1; i <= 1000; ++i) {
            int index = i % scientists.length;
            Document document = new Document();
            document.append("_id", (Object)i);
            document.append("scientist", (Object)scientists[index]);
            collection.insertOne((Object)document);
        }
    }

    @After
    public void stop() throws Exception {
        LOG.info("Stopping MongoDB instance");
        this.mongodProcess.stop();
        this.mongodExecutable.stop();
    }

    @Test
    public void testSplitIntoFilters() throws Exception {
        ArrayList<Document> documents = new ArrayList<Document>();
        documents.add(new Document("_id", (Object)56));
        documents.add(new Document("_id", (Object)109));
        documents.add(new Document("_id", (Object)256));
        List filters = MongoDbIO.BoundedMongoDbSource.splitKeysToFilters(documents, null);
        Assert.assertEquals((long)4L, (long)filters.size());
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$lte:ObjectId(\"56\")}} ]}", filters.get(0));
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$gt:ObjectId(\"56\"),$lte:ObjectId(\"109\")}} ]}", filters.get(1));
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$gt:ObjectId(\"109\"),$lte:ObjectId(\"256\")}} ]}", filters.get(2));
        Assert.assertEquals((Object)"{ $and: [ {\"_id\":{$gt:ObjectId(\"256\")}} ]}", filters.get(3));
    }

    @Test
    public void testFullRead() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)1000L);
        PAssert.that((PCollection)((PCollection)((PCollection)output.apply("Map Scientist", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Document, KV<String, Void>>(){

            public KV<String, Void> apply(Document input) {
                return KV.of((Object)input.getString((Object)"scientist"), null);
            }
        }))).apply("Count Scientist", Count.perKey()))).satisfies((SerializableFunction)new SerializableFunction<Iterable<KV<String, Long>>, Void>(){

            public Void apply(Iterable<KV<String, Long>> input) {
                for (KV<String, Long> element : input) {
                    Assert.assertEquals((long)100L, (long)((Long)element.getValue()));
                }
                return null;
            }
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithCustomConnectionOptions() throws Exception {
        MongoDbIO.Read read = MongoDbIO.read().withUri("mongodb://localhost:" + port).withKeepAlive(false).withMaxConnectionIdleTime(10).withDatabase(DATABASE).withCollection(COLLECTION);
        Assert.assertFalse((boolean)read.keepAlive());
        Assert.assertEquals((long)10L, (long)read.maxConnectionIdleTime());
        PCollection documents = (PCollection)this.pipeline.apply((PTransform)read);
        PAssert.thatSingleton((PCollection)((PCollection)documents.apply("Count All", Count.globally()))).isEqualTo((Object)1000L);
        PAssert.that((PCollection)((PCollection)((PCollection)documents.apply("Map Scientist", (PTransform)MapElements.via((SimpleFunction)new SimpleFunction<Document, KV<String, Void>>(){

            public KV<String, Void> apply(Document input) {
                return KV.of((Object)input.getString((Object)"scientist"), null);
            }
        }))).apply("Count Scientist", Count.perKey()))).satisfies((SerializableFunction)new SerializableFunction<Iterable<KV<String, Long>>, Void>(){

            public Void apply(Iterable<KV<String, Long>> input) {
                for (KV<String, Long> element : input) {
                    Assert.assertEquals((long)100L, (long)((Long)element.getValue()));
                }
                return null;
            }
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithFilter() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION).withFilter("{\"scientist\":\"Einstein\"}"));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count", Count.globally()))).isEqualTo((Object)100L);
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        ArrayList<Document> data = new ArrayList<Document>();
        for (int i = 0; i < 10000; ++i) {
            data.add(Document.parse((String)String.format("{\"scientist\":\"Test %s\"}", i)));
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase(COLLECTION).withCollection(COLLECTION));
        this.pipeline.run();
        MongoClient client = new MongoClient("localhost", port);
        MongoDatabase database = client.getDatabase(COLLECTION);
        MongoCollection collection = database.getCollection(COLLECTION);
        MongoCursor cursor = collection.find().iterator();
        int count = 0;
        while (cursor.hasNext()) {
            ++count;
            cursor.next();
        }
        Assert.assertEquals((long)10000L, (long)count);
    }
}

