package org.apache.beam.sdk.io.mongodb;

import com.google.cloud.Timestamp;
import com.mongodb.MongoClient;
import java.util.Date;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.IOTestPipelineOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.bson.Document;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDBIOIT.class */
public class MongoDBIOIT {
    private static String bigQueryDataset;
    private static String bigQueryTable;
    private static MongoDBPipelineOptions options;
    private static String collection;

    @Rule
    public final TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public final TestPipeline readPipeline = TestPipeline.create();
    private static final String NAMESPACE = MongoDBIOIT.class.getName();
    private static final Map<Integer, String> EXPECTED_HASHES = ImmutableMap.of(1000, "75a0d5803418444e76ae5b421662764c", 100000, "3bc762dc1c291904e3c7f577774c6276", 10000000, "e5e0503902018c83e8c8977ef437feba");

    /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDBIOIT$DocumentToStringFn.class */
    private static class DocumentToStringFn extends SimpleFunction<Document, String> {
        private DocumentToStringFn() {
        }

        public String apply(Document document) {
            return document.getString("scientist");
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDBIOIT$LongToDocumentFn.class */
    private static class LongToDocumentFn extends SimpleFunction<Long, Document> {
        private LongToDocumentFn() {
        }

        public Document apply(Long l) {
            return Document.parse(String.format("{\"scientist\":\"Test %s\"}", l));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/mongodb/MongoDBIOIT$MongoDBPipelineOptions.class */
    public interface MongoDBPipelineOptions extends IOTestPipelineOptions {
        @Default.String("mongodb-host")
        @Description("MongoDB host (host name/ip address)")
        String getMongoDBHostName();

        void setMongoDBHostName(String str);

        @Description("Port for MongoDB")
        @Default.Integer(27017)
        Integer getMongoDBPort();

        void setMongoDBPort(Integer num);

        @Default.String("beam")
        @Description("Mongo database name")
        String getMongoDBDatabaseName();

        void setMongoDBDatabaseName(String str);
    }

    @BeforeClass
    public static void setUp() {
        PipelineOptionsFactory.register(MongoDBPipelineOptions.class);
        options = TestPipeline.testingPipelineOptions().as(MongoDBPipelineOptions.class);
        collection = String.format("test_%s", Long.valueOf(new Date().getTime()));
        bigQueryDataset = options.getBigQueryDataset();
        bigQueryTable = options.getBigQueryTable();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        IOITHelper.executeWithRetry(MongoDBIOIT::dropDatabase);
    }

    public static void dropDatabase() throws Exception {
        new MongoClient(options.getMongoDBHostName()).getDatabase(options.getMongoDBDatabaseName()).drop();
    }

    @Test
    public void testWriteAndRead() {
        String format = String.format("mongodb://%s:%s", options.getMongoDBHostName(), options.getMongoDBPort());
        this.writePipeline.apply("Generate sequence", GenerateSequence.from(0L).to(options.getNumberOfRecords().intValue())).apply("Produce documents", MapElements.via(new LongToDocumentFn())).apply("Collect write time metric", ParDo.of(new TimeMonitor(NAMESPACE, "write_time"))).apply("Write documents to MongoDB", MongoDbIO.write().withUri(format).withDatabase(options.getMongoDBDatabaseName()).withCollection(collection));
        PipelineResult run = this.writePipeline.run();
        run.waitUntilFinish();
        PAssert.thatSingleton(this.readPipeline.apply("Read all documents", MongoDbIO.read().withUri(format).withDatabase(options.getMongoDBDatabaseName()).withCollection(collection)).apply("Collect read time metrics", ParDo.of(new TimeMonitor(NAMESPACE, "read_time"))).apply("Map documents to Strings", MapElements.via(new DocumentToStringFn())).apply("Calculate hashcode", Combine.globally(new HashingFn()))).isEqualTo(IOITHelper.getHashForRecordCount(options.getNumberOfRecords().intValue(), EXPECTED_HASHES));
        PipelineResult run2 = this.readPipeline.run();
        run2.waitUntilFinish();
        collectAndPublishMetrics(run, run2);
    }

    private void collectAndPublishMetrics(PipelineResult pipelineResult, PipelineResult pipelineResult2) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        Set<Function<MetricsReader, NamedTestResult>> readSuppliers = getReadSuppliers(uuid, timestamp);
        Set<Function<MetricsReader, NamedTestResult>> writeSuppliers = getWriteSuppliers(uuid, timestamp);
        IOITMetrics iOITMetrics = new IOITMetrics(readSuppliers, pipelineResult2, NAMESPACE, uuid, timestamp);
        IOITMetrics iOITMetrics2 = new IOITMetrics(writeSuppliers, pipelineResult, NAMESPACE, uuid, timestamp);
        iOITMetrics.publish(bigQueryDataset, bigQueryTable);
        iOITMetrics2.publish(bigQueryDataset, bigQueryTable);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "write_time", (metricsReader.getEndTimeMetric("write_time") - metricsReader.getStartTimeMetric("write_time")) / 1000.0d);
        });
        return hashSet;
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "read_time", (metricsReader.getEndTimeMetric("read_time") - metricsReader.getStartTimeMetric("read_time")) / 1000.0d);
        });
        return hashSet;
    }
}
