package org.apache.beam.sdk.io.mongodb;

import com.mongodb.DB;
import com.mongodb.Mongo;
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSDBFile;
import de.flapdoodle.embed.mongo.MongodExecutable;
import de.flapdoodle.embed.mongo.MongodProcess;
import de.flapdoodle.embed.mongo.MongodStarter;
import de.flapdoodle.embed.mongo.config.MongoCmdOptionsBuilder;
import de.flapdoodle.embed.mongo.config.MongodConfigBuilder;
import de.flapdoodle.embed.mongo.config.Net;
import de.flapdoodle.embed.mongo.config.Storage;
import de.flapdoodle.embed.mongo.distribution.Version;
import de.flapdoodle.embed.process.runtime.Network;
import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Scanner;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.mongodb.MongoDbGridFSIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest.class */
public class MongoDBGridFSIOTest {
    private static final String DATABASE = "gridfs";
    private static MongodExecutable mongodExecutable;
    private static MongodProcess mongodProcess;
    private static int port;

    @Rule
    public final TestPipeline pipeline = TestPipeline.create();
    private static final Logger LOG = LoggerFactory.getLogger(MongoDBGridFSIOTest.class);

    @ClassRule
    public static final TemporaryFolder MONGODB_LOCATION = new TemporaryFolder();
    private static final MongodStarter mongodStarter = MongodStarter.getDefaultInstance();

    @BeforeClass
    public static void start() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting MongoDB embedded instance on {}", Integer.valueOf(port));
        mongodExecutable = mongodStarter.prepare(new MongodConfigBuilder().version(Version.Main.PRODUCTION).configServer(false).replication(new Storage(MONGODB_LOCATION.getRoot().getPath(), (String) null, 0)).net(new Net("localhost", port, Network.localhostIsIPv6())).cmdOptions(new MongoCmdOptionsBuilder().syncDelay(10).useNoPrealloc(true).useSmallFiles(true).useNoJournal(true).verbose(false).build()).build());
        mongodProcess = mongodExecutable.start();
        LOG.info("Insert test data");
        Mongo mongo = new Mongo("localhost", port);
        DB db = mongo.getDB(DATABASE);
        GridFS gridFS = new GridFS(db);
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        for (int i = 0; i < 100; i++) {
            byteArrayOutputStream.write("Einstein\nDarwin\nCopernicus\nPasteur\nCurie\nFaraday\nNewton\nBohr\nGalilei\nMaxwell\n".getBytes(StandardCharsets.UTF_8));
        }
        for (int i2 = 0; i2 < 5; i2++) {
            gridFS.createFile(new ByteArrayInputStream(byteArrayOutputStream.toByteArray()), "file" + i2).save();
        }
        GridFS gridFS2 = new GridFS(db, "mapBucket");
        long currentTimeMillis = System.currentTimeMillis();
        Random random = new Random();
        String[] strArr = {"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
        for (int i3 = 0; i3 < 10; i3++) {
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(gridFS2.createFile("file_" + i3).getOutputStream(), StandardCharsets.UTF_8);
            for (int i4 = 0; i4 < 5000; i4++) {
                long nextInt = currentTimeMillis - random.nextInt(3600000);
                String str = strArr[i4 % strArr.length];
                outputStreamWriter.write(Long.toString(nextInt) + "\t");
                outputStreamWriter.write(str + "\t");
                outputStreamWriter.write(Integer.toString(random.nextInt(100)));
                outputStreamWriter.write("\n");
            }
            for (int i5 = 0; i5 < strArr.length; i5++) {
                String str2 = strArr[i5 % strArr.length];
                outputStreamWriter.write(Long.toString(currentTimeMillis) + "\t");
                outputStreamWriter.write(str2 + "\t");
                outputStreamWriter.write("101");
                outputStreamWriter.write("\n");
            }
            outputStreamWriter.flush();
            outputStreamWriter.close();
        }
        mongo.close();
    }

    @AfterClass
    public static void stop() {
        LOG.info("Stopping MongoDB instance");
        mongodProcess.stop();
        mongodExecutable.stop();
    }

    @Test
    public void testFullRead() {
        PCollection apply = this.pipeline.apply(MongoDbGridFSIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(5000L);
        PAssert.that(apply.apply("Count PerElement", Count.perElement())).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(500L, ((Long) ((KV) it.next()).getValue()).longValue());
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testReadWithParser() {
        PCollection apply = this.pipeline.apply(MongoDbGridFSIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withBucket("mapBucket").withParser((gridFSDBFile, parserCallback) -> {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gridFSDBFile.getInputStream(), StandardCharsets.UTF_8));
            try {
                for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                    Scanner scanner = new Scanner(readLine.trim());
                    Throwable th = null;
                    try {
                        try {
                            scanner.useDelimiter("\\t");
                            parserCallback.output(KV.of(scanner.next(), Integer.valueOf(scanner.nextInt())), new Instant(scanner.nextLong()));
                            $closeResource(null, scanner);
                        } finally {
                        }
                    } catch (Throwable th2) {
                        $closeResource(th, scanner);
                        throw th2;
                    }
                }
            } finally {
                $closeResource(null, bufferedReader);
            }
        }).withSkew(new Duration(3610000L)).withCoder(KvCoder.of(StringUtf8Coder.of(), VarIntCoder.of())));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(50100L);
        PAssert.that(apply.apply("Max PerElement", Max.integersPerKey())).satisfies(iterable -> {
            Iterator it = iterable.iterator();
            while (it.hasNext()) {
                Assert.assertEquals(101L, ((Integer) ((KV) it.next()).getValue()).longValue());
            }
            return null;
        });
        this.pipeline.run();
    }

    @Test
    public void testSplit() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        MongoDbGridFSIO.Read.BoundedGridFSSource boundedGridFSSource = new MongoDbGridFSIO.Read.BoundedGridFSSource(MongoDbGridFSIO.read().withUri("mongodb://localhost:" + port).withDatabase(DATABASE), (List) null);
        List split = boundedGridFSSource.split(((boundedGridFSSource.getEstimatedSizeBytes(create) * 2) / 5) + 1000, create);
        Assert.assertEquals(3, split.size());
        SourceTestUtils.assertSourcesEqualReferenceSource(boundedGridFSSource, split, create);
        int i = 0;
        int i2 = 0;
        Iterator it = split.iterator();
        while (it.hasNext()) {
            List readFromSource = SourceTestUtils.readFromSource((BoundedSource) it.next(), create);
            if (readFromSource.size() > 0) {
                i++;
            }
            i2 += readFromSource.size();
        }
        Assert.assertEquals(3, i);
        Assert.assertEquals(5L, i2);
    }

    @Test
    public void testWriteMessage() throws Exception {
        InputStream inputStream;
        ArrayList arrayList = new ArrayList(100);
        ArrayList arrayList2 = new ArrayList(100);
        for (int i = 0; i < 1000; i++) {
            arrayList.add("Message " + i);
        }
        for (int i2 = 0; i2 < 100; i2++) {
            arrayList2.add(Integer.valueOf(i2));
        }
        this.pipeline.apply("String", Create.of(arrayList)).apply("StringInternal", MongoDbGridFSIO.write().withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withChunkSize(100L).withBucket("WriteTest").withFilename("WriteTestData"));
        this.pipeline.apply("WithWriteFn", Create.of(arrayList2)).apply("WithWriteFnInternal", MongoDbGridFSIO.write((num, outputStream) -> {
            outputStream.write(num.byteValue());
        }).withUri("mongodb://localhost:" + port).withDatabase(DATABASE).withBucket("WriteTest").withFilename("WriteTestIntData"));
        this.pipeline.run();
        Mongo mongo = null;
        try {
            StringBuilder sb = new StringBuilder();
            mongo = new Mongo("localhost", port);
            GridFS gridFS = new GridFS(mongo.getDB(DATABASE), "WriteTest");
            List<GridFSDBFile> find = gridFS.find("WriteTestData");
            Assert.assertTrue(find.size() > 0);
            for (GridFSDBFile gridFSDBFile : find) {
                Assert.assertEquals(100L, gridFSDBFile.getChunkSize());
                int length = (int) gridFSDBFile.getLength();
                inputStream = gridFSDBFile.getInputStream();
                Throwable th = null;
                try {
                    try {
                        DataInputStream dataInputStream = new DataInputStream(inputStream);
                        byte[] bArr = new byte[length];
                        dataInputStream.readFully(bArr);
                        sb.append(new String(bArr, StandardCharsets.UTF_8));
                        if (inputStream != null) {
                            $closeResource(null, inputStream);
                        }
                    } catch (Throwable th2) {
                        th = th2;
                        throw th2;
                    }
                } finally {
                }
            }
            String sb2 = sb.toString();
            for (int i3 = 0; i3 < 1000; i3++) {
                Assert.assertTrue(sb2.contains("Message " + i3));
            }
            List<GridFSDBFile> find2 = gridFS.find("WriteTestIntData");
            boolean[] zArr = new boolean[100];
            for (GridFSDBFile gridFSDBFile2 : find2) {
                int length2 = (int) gridFSDBFile2.getLength();
                inputStream = gridFSDBFile2.getInputStream();
                Throwable th3 = null;
                try {
                    try {
                        DataInputStream dataInputStream2 = new DataInputStream(inputStream);
                        byte[] bArr2 = new byte[length2];
                        dataInputStream2.readFully(bArr2);
                        for (byte b : bArr2) {
                            zArr[b] = true;
                        }
                        if (inputStream != null) {
                            $closeResource(null, inputStream);
                        }
                    } catch (Throwable th4) {
                        th3 = th4;
                        throw th4;
                    }
                } finally {
                }
            }
            for (int i4 = 0; i4 < 100; i4++) {
                Assert.assertTrue("Did not get a result for " + i4, zArr[i4]);
            }
            if (mongo != null) {
                mongo.close();
            }
        } catch (Throwable th5) {
            if (mongo != null) {
                mongo.close();
            }
            throw th5;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -797928862:
                if (implMethodName.equals("lambda$testReadWithParser$3b1c396$1")) {
                    z = true;
                    break;
                }
                break;
            case -347033007:
                if (implMethodName.equals("lambda$testWriteMessage$416bf774$1")) {
                    z = false;
                    break;
                }
                break;
            case 533555622:
                if (implMethodName.equals("lambda$testReadWithParser$43268ee4$1")) {
                    z = 3;
                    break;
                }
                break;
            case 1940887068:
                if (implMethodName.equals("lambda$testFullRead$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$WriteFn") && serializedLambda.getFunctionalInterfaceMethodName().equals("write") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/io/OutputStream;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Integer;Ljava/io/OutputStream;)V")) {
                    return (num, outputStream) -> {
                        outputStream.write(num.byteValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$Parser") && serializedLambda.getFunctionalInterfaceMethodName().equals("parse") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lcom/mongodb/gridfs/GridFSDBFile;Lorg/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$ParserCallback;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest") && serializedLambda.getImplMethodSignature().equals("(Lcom/mongodb/gridfs/GridFSDBFile;Lorg/apache/beam/sdk/io/mongodb/MongoDbGridFSIO$ParserCallback;)V")) {
                    return (gridFSDBFile, parserCallback) -> {
                        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(gridFSDBFile.getInputStream(), StandardCharsets.UTF_8));
                        try {
                            for (String readLine = bufferedReader.readLine(); readLine != null; readLine = bufferedReader.readLine()) {
                                Scanner scanner = new Scanner(readLine.trim());
                                Throwable th = null;
                                try {
                                    try {
                                        scanner.useDelimiter("\\t");
                                        parserCallback.output(KV.of(scanner.next(), Integer.valueOf(scanner.nextInt())), new Instant(scanner.nextLong()));
                                        $closeResource(null, scanner);
                                    } finally {
                                    }
                                } catch (Throwable th2) {
                                    $closeResource(th, scanner);
                                    throw th2;
                                }
                            }
                        } finally {
                            $closeResource(null, bufferedReader);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable -> {
                        Iterator it = iterable.iterator();
                        while (it.hasNext()) {
                            Assert.assertEquals(500L, ((Long) ((KV) it.next()).getValue()).longValue());
                        }
                        return null;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/mongodb/MongoDBGridFSIOTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Iterable;)Ljava/lang/Void;")) {
                    return iterable2 -> {
                        Iterator it = iterable2.iterator();
                        while (it.hasNext()) {
                            Assert.assertEquals(101L, ((Integer) ((KV) it.next()).getValue()).longValue());
                        }
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
