/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mongodb;

import com.mongodb.DB;
import com.mongodb.Mongo;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import com.mongodb.gridfs.GridFSInputFile;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.IMongodConfig;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.IFeatureAwareVersion;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.config.IExecutableProcessConfig;
import de.flapdoodle.embed.process.runtime.Network;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class MongoDBGridFSIOTest {
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBGridFSIOTest.class);
    @ClassRule
    public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder();
    private static final String DATABASE = "gridfs";
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();
    private static MongodExecutable mongodExecutable;
    private static MongodProcess mongodProcess;
    private static int port;
    @Rule
    public final TestPipeline pipeline = TestPipeline.create();

    @BeforeClass
    public static void start() throws Exception {
        int x;
        port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting MongoDB embedded instance on {}", (Object)port);
        IMongodConfig mongodConfig = new MongodConfigBuilder().version((IFeatureAwareVersion)Version.Main.PRODUCTION).configServer(false).replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), null, 0)).net(new Net("localhost", port, Network.localhostIsIPv6())).cmdOptions(new MongoCmdOptionsBuilder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).verbose(false).build()).build();
        mongodExecutable = (MongodExecutable)mongodStarter.prepare((IExecutableProcessConfig)mongodConfig);
        mongodProcess = (MongodProcess)mongodExecutable.start();
        LOG.info("Insert test data");
        Mongo client = new Mongo("localhost", port);
        DB database = client.getDB(DATABASE);
        GridFS gridfs = new GridFS(database);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        for (x = 0; x < 100; ++x) {
            out.write("Einstein\nDarwin\nCopernicus\nPasteur\nCurie\nFaraday\nNewton\nBohr\nGalilei\nMaxwell\n".getBytes(StandardCharsets.UTF_8));
        }
        for (x = 0; x < 5; ++x) {
            gridfs.createFile((InputStream)new ByteArrayInputStream(out.toByteArray()), "file" + x).save();
        }
        gridfs = new GridFS(database, "mapBucket");
        long now = System.currentTimeMillis();
        Random random = new Random();
        String[] scientists = new String[]{"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        for (int x2 = 0; x2 < 10; ++x2) {
            int y;
            GridFSInputFile file = gridfs.createFile("file_" + x2);
            OutputStream outf = file.getOutputStream();
            OutputStreamWriter writer = new OutputStreamWriter(outf, StandardCharsets.UTF_8);
            for (y = 0; y < 5000; ++y) {
                long time = now - (long)random.nextInt(3600000);
                String name = scientists[y % scientists.length];
                writer.write(Long.toString(time) + "\t");
                writer.write(name + "\t");
                writer.write(Integer.toString(random.nextInt(100)));
                writer.write("\n");
            }
            for (y = 0; y < scientists.length; ++y) {
                String name = scientists[y % scientists.length];
                writer.write(Long.toString(now) + "\t");
                writer.write(name + "\t");
                writer.write("101");
                writer.write("\n");
            }
            writer.flush();
            writer.close();
        }
        client.close();
    }

    @AfterClass
    public static void stop() {
        LOG.info("Stopping MongoDB instance");
        mongodProcess.stop();
        mongodExecutable.stop();
    }

    @Test
    public void testFullRead() {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbGridFSIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)5000L);
        PAssert.that((PCollection)((PCollection)output.apply("Count PerElement", Count.perElement()))).satisfies((SerializableFunction & Serializable)input -> {
            for (KV element : input) {
                Assert.assertEquals((long)500L, (long)((Long)element.getValue()));
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithParser() {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)MongoDbGridFSIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withBucket("mapBucket").withParser((MongoDbGridFSIO.Parser & Serializable)(input, callback) -> {
            BufferedReader reader = new BufferedReader(new InputStreamReader(input.getInputStream(), StandardCharsets.UTF_8));
            Throwable throwable = null;
            try {
                String line = reader.readLine();
                while (line != null) {
                    Scanner scanner = new Scanner(line.trim());
                    Throwable throwable2 = null;
                    try {
                        scanner.useDelimiter("\\t");
                        long timestamp = scanner.nextLong();
                        String name = scanner.next();
                        int score = scanner.nextInt();
                        callback.output((Object)KV.of((Object)name, (Object)score), new Instant(timestamp));
                    }
                    catch (Throwable throwable3) {
                        throwable2 = throwable3;
                        throw throwable3;
                    }
                    finally {
                        MongoDBGridFSIOTest.$closeResource(throwable2, scanner);
                    }
                    line = reader.readLine();
                }
            }
            catch (Throwable throwable4) {
                throwable = throwable4;
                throw throwable4;
            }
            finally {
                MongoDBGridFSIOTest.$closeResource(throwable, reader);
            }
        }).withSkew(new Duration(3610000L)).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)VarIntCoder.of())));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", Count.globally()))).isEqualTo((Object)50100L);
        PAssert.that((PCollection)((PCollection)output.apply("Max PerElement", (PTransform)Max.integersPerKey()))).satisfies((SerializableFunction & Serializable)input -> {
            for (KV element : input) {
                Assert.assertEquals((long)101L, (long)((Integer)element.getValue()).longValue());
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testSplit() throws Exception {
        PipelineOptions options = PipelineOptionsFactory.create();
        MongoDbGridFSIO.Read read = MongoDbGridFSIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE);
        MongoDbGridFSIO.Read.BoundedGridFSSource src = new MongoDbGridFSIO.Read.BoundedGridFSSource(read, null);
        long desiredBundleSizeBytes = src.getEstimatedSizeBytes(options) * 2L / 5L + 1000L;
        List splits = src.split(desiredBundleSizeBytes, options);
        int expectedNbSplits = 3;
        Assert.assertEquals((long)expectedNbSplits, (long)splits.size());
        SourceTestUtils.assertSourcesEqualReferenceSource((BoundedSource)src, (List)splits, (PipelineOptions)options);
        int nonEmptySplits = 0;
        int count = 0;
        for (BoundedSource subSource : splits) {
            List result = SourceTestUtils.readFromSource((BoundedSource)subSource, (PipelineOptions)options);
            if (result.size() > 0) {
                ++nonEmptySplits;
            }
            count += result.size();
        }
        Assert.assertEquals((long)expectedNbSplits, (long)nonEmptySplits);
        Assert.assertEquals((long)5L, (long)count);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteMessage() throws Exception {
        int i;
        ArrayList<String> data = new ArrayList<String>(100);
        ArrayList<Integer> intData = new ArrayList<Integer>(100);
        for (i = 0; i < 1000; ++i) {
            data.add("Message " + i);
        }
        for (i = 0; i < 100; ++i) {
            intData.add(i);
        }
        ((PCollection)this.pipeline.apply("String", (PTransform)Create.of(data))).apply("StringInternal", (PTransform)MongoDbGridFSIO.write().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withChunkSize(Long.valueOf(100L)).withBucket("WriteTest").withFilename("WriteTestData"));
        ((PCollection)this.pipeline.apply("WithWriteFn", (PTransform)Create.of(intData))).apply("WithWriteFnInternal", (PTransform)MongoDbGridFSIO.write((MongoDbGridFSIO.WriteFn & Serializable)(output, outStream) -> outStream.write(output.byteValue())).withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withBucket("WriteTest").withFilename("WriteTestIntData"));
        this.pipeline.run();
        try (Mongo client = null;){
            StringBuilder results = new StringBuilder();
            client = new Mongo("localhost", port);
            DB database = client.getDB(DATABASE);
            GridFS gridfs = new GridFS(database, "WriteTest");
            List files = gridfs.find("WriteTestData");
            Assert.assertTrue((files.size() > 0 ? 1 : 0) != 0);
            for (GridFSDBFile file : files) {
                Assert.assertEquals((long)100L, (long)file.getChunkSize());
                int l = (int)file.getLength();
                InputStream ins = file.getInputStream();
                Throwable throwable = null;
                try {
                    DataInputStream dis = new DataInputStream(ins);
                    byte[] b = new byte[l];
                    dis.readFully(b);
                    results.append(new String(b, StandardCharsets.UTF_8));
                }
                catch (Throwable dis) {
                    throwable = dis;
                    throw dis;
                }
                finally {
                    if (ins == null) continue;
                    MongoDBGridFSIOTest.$closeResource(throwable, ins);
                }
            }
            String dataString = results.toString();
            for (int x = 0; x < 1000; ++x) {
                Assert.assertTrue((boolean)dataString.contains("Message " + x));
            }
            files = gridfs.find("WriteTestIntData");
            boolean[] intResults = new boolean[100];
            for (GridFSDBFile file : files) {
                int l = (int)file.getLength();
                InputStream ins = file.getInputStream();
                Throwable throwable = null;
                try {
                    DataInputStream dis = new DataInputStream(ins);
                    byte[] b = new byte[l];
                    dis.readFully(b);
                    for (byte aB : b) {
                        intResults[aB] = true;
                    }
                }
                catch (Throwable throwable2) {
                    throwable = throwable2;
                    throw throwable2;
                }
                finally {
                    if (ins == null) continue;
                    MongoDBGridFSIOTest.$closeResource(throwable, ins);
                }
            }
            for (int x = 0; x < 100; ++x) {
                Assert.assertTrue((String)("Did not get a result for " + x), (boolean)intResults[x]);
            }
        }
    }

    private static /* synthetic */ /* end resource */ void $closeResource(Throwable x0, AutoCloseable x1) {
        if (x0 != null) {
            try {
                x1.close();
            }
            catch (Throwable throwable) {
                x0.addSuppressed(throwable);
            }
        } else {
            x1.close();
        }
    }
}

