package org.apache.beam.sdk.io.mongodb;

import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.io.file.Files;
import de.flapdoodle.embed.process.runtime.Network;
import java.io.File;
import java.io.Serializable;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.bson.Document;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDbIOTest.class */
public class MongoDbIOTest implements Serializable {
    private static final String MONGODB_LOCATION = "target/mongodb";
    private static final String DATABASE = "beam";
    private static final String COLLECTION = "test";
    private transient MongodExecutable mongodExecutable;
    private transient MongodProcess mongodProcess;
    private static int port;

    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    private static final Logger LOG = LoggerFactory.getLogger(MongoDbIOTest.class);
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();

    @BeforeClass
    public static void availablePort() throws Exception {
        ServerSocket serverSocket = new ServerSocket(0);
        Throwable th = null;
        try {
            port = serverSocket.getLocalPort();
            if (serverSocket != null) {
                if (0 == 0) {
                    serverSocket.close();
                    return;
                }
                try {
                    serverSocket.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (serverSocket != null) {
                if (0 != 0) {
                    try {
                        serverSocket.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    serverSocket.close();
                }
            }
            throw th3;
        }
    }

    @Before
    public void setup() throws Exception {
        LOG.info("Starting MongoDB embedded instance on {}", Integer.valueOf(port));
        try {
            Files.forceDelete(new File(MONGODB_LOCATION));
        } catch (Exception e) {
        }
        new File(MONGODB_LOCATION).mkdirs();
        this.mongodExecutable = mongodStarter.prepare(new MongodConfigBuilder().version(Version.Main.PRODUCTION).configServer(false).replication(new Storage(MONGODB_LOCATION, (String) null, 0)).net(new Net("localhost", port, Network.localhostIsIPv6())).cmdOptions(new MongoCmdOptionsBuilder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).build()).build());
        this.mongodProcess = this.mongodExecutable.start();
        LOG.info("Insert test data");
        MongoCollection collection = new MongoClient("localhost", port).getDatabase(DATABASE).getCollection(COLLECTION);
        String[] strArr = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        for (int i = 1; i <= 1000; i++) {
            int length = i % strArr.length;
            Document document = new Document();
            document.append("_id", Integer.valueOf(i));
            document.append("scientist", strArr[length]);
            collection.insertOne(document);
        }
    }

    @After
    public void stop() throws Exception {
        LOG.info("Stopping MongoDB instance");
        this.mongodProcess.stop();
        this.mongodExecutable.stop();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testFullRead() throws Exception {
        PCollection apply = this.pipeline.apply(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(1000L);
        PAssert.that(apply.apply("Map Scientist", MapElements.via(new SimpleFunction<Document, KV<String, Void>>() { // from class: org.apache.beam.sdk.io.mongodb.MongoDbIOTest.2
            public KV<String, Void> apply(Document document) {
                return KV.of(document.getString("scientist"), (Object) null);
            }
        })).apply("Count Scientist", Count.perKey())).satisfies(new SerializableFunction<Iterable<KV<String, Long>>, Void>() { // from class: org.apache.beam.sdk.io.mongodb.MongoDbIOTest.1
            public Void apply(Iterable<KV<String, Long>> iterable) {
                Iterator<KV<String, Long>> it = iterable.iterator();
                while (it.hasNext()) {
                    Assert.assertEquals(100L, ((Long) it.next().getValue()).longValue());
                }
                return null;
            }
        });
        this.pipeline.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testReadWithFilter() throws Exception {
        PAssert.thatSingleton(this.pipeline.apply(MongoDbIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withCollection(COLLECTION).withFilter("{\"scientist\":\"Einstein\"}")).apply("Count", Count.globally())).isEqualTo(100L);
        this.pipeline.run();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWrite() throws Exception {
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 10000; i++) {
            arrayList.add(Document.parse(String.format("{\"scientist\":\"Test %s\"}", Integer.valueOf(i))));
        }
        this.pipeline.apply(Create.of(arrayList)).apply(MongoDbIO.write().withUri("mongodb://localhost:" + port).withDatabase(COLLECTION).withCollection(COLLECTION));
        this.pipeline.run();
        MongoCursor it = new MongoClient("localhost", port).getDatabase(COLLECTION).getCollection(COLLECTION).find().iterator();
        int i2 = 0;
        while (it.hasNext()) {
            i2++;
            it.next();
        }
        Assert.assertEquals(10000L, i2);
    }
}
