/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.mongodb;

import com.google.cloud.Timestamp;
import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoClients;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.bson.Document;
import org.bson.conversions.Bson;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class MongoDBIOIT {
    private static final String NAMESPACE = MongoDBIOIT.class.getName();
    private static String mongoUrl;
    private static MongoClient mongoClient;
    private static InfluxDBSettings settings;
    private double initialCollectionSize;
    private double finalCollectionSize;
    private static final Map<Integer, String> EXPECTED_HASHES;
    private static MongoDBPipelineOptions options;
    private static String collection;
    @Rule
    public final TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public final TestPipeline readPipeline = TestPipeline.create();

    @BeforeClass
    public static void setUp() {
        PipelineOptionsFactory.register(MongoDBPipelineOptions.class);
        options = (MongoDBPipelineOptions)TestPipeline.testingPipelineOptions().as(MongoDBPipelineOptions.class);
        collection = String.format("test_%s", new Date().getTime());
        mongoUrl = String.format("mongodb://%s:%s", options.getMongoDBHostName(), options.getMongoDBPort());
        mongoClient = MongoClients.create((String)mongoUrl);
        settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
    }

    @After
    public void cleanUp() {
        this.initialCollectionSize = -1.0;
        this.finalCollectionSize = -1.0;
    }

    @AfterClass
    public static void tearDown() throws Exception {
        IOITHelper.executeWithRetry(MongoDBIOIT::dropDatabase);
    }

    public static void dropDatabase() {
        mongoClient.getDatabase(options.getMongoDBDatabaseName()).drop();
    }

    @Test
    public void testWriteAndRead() {
        this.initialCollectionSize = this.getCollectionSizeInBytes(collection);
        ((PCollection)((PCollection)((PCollection)this.writePipeline.apply("Generate sequence", (PTransform)GenerateSequence.from((long)0L).to((long)options.getNumberOfRecords().intValue()))).apply("Produce documents", (PTransform)MapElements.via((SimpleFunction)new LongToDocumentFn()))).apply("Collect write time metric", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, "write_time")))).apply("Write documents to MongoDB", (PTransform)MongoDbIO.write().withUri(mongoUrl).withDatabase(options.getMongoDBDatabaseName()).withCollection(collection));
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        this.finalCollectionSize = this.getCollectionSizeInBytes(collection);
        PCollection consolidatedHashcode = (PCollection)((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read all documents", (PTransform)MongoDbIO.read().withUri(mongoUrl).withDatabase(options.getMongoDBDatabaseName()).withCollection(collection))).apply("Collect read time metrics", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, "read_time")))).apply("Map documents to Strings", (PTransform)MapElements.via((SimpleFunction)new DocumentToStringFn()))).apply("Calculate hashcode", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()));
        String expectedHash = IOITHelper.getHashForRecordCount((int)options.getNumberOfRecords(), EXPECTED_HASHES);
        PAssert.thatSingleton((PCollection)consolidatedHashcode).isEqualTo((Object)expectedHash);
        PipelineResult readResult = this.readPipeline.run();
        readResult.waitUntilFinish();
        this.collectAndPublishMetrics(writeResult, readResult);
    }

    private double getCollectionSizeInBytes(String collectionName) {
        return mongoClient.getDatabase(options.getMongoDBDatabaseName()).runCommand((Bson)new Document("collStats", (Object)collectionName)).entrySet().stream().filter(entry -> ((String)entry.getKey()).equals("size")).map(entry -> Double.parseDouble(String.valueOf(entry.getValue()))).findFirst().orElseThrow(() -> new IllegalStateException("Unable to retrieve collection stats"));
    }

    private void collectAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        Set<Function<MetricsReader, NamedTestResult>> readSuppliers = this.getReadSuppliers(uuid, timestamp);
        Set<Function<MetricsReader, NamedTestResult>> writeSuppliers = this.getWriteSuppliers(uuid, timestamp);
        IOITMetrics readMetrics = new IOITMetrics(readSuppliers, readResult, NAMESPACE, uuid, timestamp);
        IOITMetrics writeMetrics = new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp);
        readMetrics.publishToInflux(settings);
        writeMetrics.publishToInflux(settings);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(String uuid, String timestamp) {
        HashSet<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<Function<MetricsReader, NamedTestResult>>();
        suppliers.add(this.getTimeMetric(uuid, timestamp, "write_time"));
        suppliers.add(reader -> NamedTestResult.create((String)uuid, (String)timestamp, (String)"data_size", (double)this.getWrittenDataSize()));
        return suppliers;
    }

    private double getWrittenDataSize() {
        if (this.initialCollectionSize == -1.0 || this.finalCollectionSize == -1.0) {
            throw new IllegalStateException("Collection size not fetched");
        }
        return this.finalCollectionSize - this.initialCollectionSize;
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(String uuid, String timestamp) {
        HashSet<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<Function<MetricsReader, NamedTestResult>>();
        suppliers.add(this.getTimeMetric(uuid, timestamp, "read_time"));
        return suppliers;
    }

    private Function<MetricsReader, NamedTestResult> getTimeMetric(String uuid, String timestamp, String metricName) {
        return reader -> {
            long writeStart = reader.getStartTimeMetric(metricName);
            long writeEnd = reader.getEndTimeMetric(metricName);
            return NamedTestResult.create((String)uuid, (String)timestamp, (String)metricName, (double)((double)(writeEnd - writeStart) / 1000.0));
        };
    }

    static {
        EXPECTED_HASHES = ImmutableMap.of((Object)1000, (Object)"75a0d5803418444e76ae5b421662764c", (Object)100000, (Object)"3bc762dc1c291904e3c7f577774c6276", (Object)10000000, (Object)"e5e0503902018c83e8c8977ef437feba");
    }

    private static class DocumentToStringFn
    extends SimpleFunction<Document, String> {
        private DocumentToStringFn() {
        }

        public String apply(Document input) {
            return input.getString((Object)"scientist");
        }
    }

    private static class LongToDocumentFn
    extends SimpleFunction<Long, Document> {
        private LongToDocumentFn() {
        }

        public Document apply(Long input) {
            return Document.parse((String)String.format("{\"scientist\":\"Test %s\"}", input));
        }
    }

    public static interface MongoDBPipelineOptions
    extends IOTestPipelineOptions {
        @Description(value="MongoDB host (host name/ip address)")
        @Default.String(value="mongodb-host")
        public String getMongoDBHostName();

        public void setMongoDBHostName(String var1);

        @Description(value="Port for MongoDB")
        @Default.Integer(value=27017)
        public Integer getMongoDBPort();

        public void setMongoDBPort(Integer var1);

        @Description(value="Mongo database name")
        @Default.String(value="beam")
        public String getMongoDBDatabaseName();

        public void setMongoDBDatabaseName(String var1);
    }
}

