/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.singlestore;

import com.google.cloud.Timestamp;
import com.singlestore.jdbc.SingleStoreDataSource;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import javax.sql.DataSource;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIOTestPipelineOptions;
import org.apache.beam.sdk.io.singlestore.TestHelper;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.PCollection;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class SingleStoreIOPerformanceIT {
    private static final String NAMESPACE = SingleStoreIOPerformanceIT.class.getName();
    private static final String DATABASE_NAME = "SingleStoreIOIT";
    private static int numberOfRows;
    private static String tableName;
    private static String serverName;
    private static String username;
    private static String password;
    private static Integer port;
    private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;
    private static InfluxDBSettings settings;
    @Rule
    public TestPipeline pipelineWrite = TestPipeline.create();
    @Rule
    public TestPipeline pipelineRead = TestPipeline.create();
    @Rule
    public TestPipeline pipelineReadWithPartitions = TestPipeline.create();

    @BeforeClass
    public static void setup() {
        SingleStoreIOTestPipelineOptions options;
        try {
            options = (SingleStoreIOTestPipelineOptions)IOITHelper.readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
        }
        catch (IllegalArgumentException e) {
            options = null;
        }
        Assume.assumeNotNull((Object[])new Object[]{options});
        numberOfRows = options.getNumberOfRecords();
        serverName = options.getSingleStoreServerName();
        username = options.getSingleStoreUsername();
        password = options.getSingleStorePassword();
        port = options.getSingleStorePort();
        tableName = DatabaseTestHelper.getTestTableName((String)"IT");
        dataSourceConfiguration = SingleStoreIO.DataSourceConfiguration.create((String)(serverName + ":" + port)).withDatabase(DATABASE_NAME).withPassword(password).withUsername(username);
        settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenRead() throws Exception {
        TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
        SingleStoreDataSource dataSource = new SingleStoreDataSource(String.format("jdbc:singlestore://%s:%d/%s?user=%s&password=%s&allowLocalInfile=TRUE", serverName, port, DATABASE_NAME, username, password));
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)tableName);
        try {
            PipelineResult writeResult = this.runWrite();
            Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)writeResult.waitUntilFinish());
            PipelineResult readResult = this.runRead();
            Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)readResult.waitUntilFinish());
            PipelineResult readResultWithPartitions = this.runReadWithPartitions();
            Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)readResultWithPartitions.waitUntilFinish());
            this.gatherAndPublishMetrics(writeResult, readResult, readResultWithPartitions);
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)dataSource, (String)tableName);
        }
    }

    private void gatherAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult, PipelineResult readResultWithPartitions) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        IOITMetrics writeMetrics = new IOITMetrics(this.getMetricSuppliers(uuid, timestamp, "write_time"), writeResult, NAMESPACE, uuid, timestamp);
        writeMetrics.publishToInflux(settings);
        IOITMetrics readMetrics = new IOITMetrics(this.getMetricSuppliers(uuid, timestamp, "read_time"), readResult, NAMESPACE, uuid, timestamp);
        readMetrics.publishToInflux(settings);
        IOITMetrics readMetricsWithPartitions = new IOITMetrics(this.getMetricSuppliers(uuid, timestamp, "read_with_partitions_time"), readResultWithPartitions, NAMESPACE, uuid, timestamp);
        readMetricsWithPartitions.publishToInflux(settings);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getMetricSuppliers(String uuid, String timestamp, String metricName) {
        HashSet<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<Function<MetricsReader, NamedTestResult>>();
        suppliers.add(reader -> {
            long writeStart = reader.getStartTimeMetric(metricName);
            long writeEnd = reader.getEndTimeMetric(metricName);
            return NamedTestResult.create((String)uuid, (String)timestamp, (String)metricName, (double)((double)(writeEnd - writeStart) / 1000.0));
        });
        return suppliers;
    }

    private PipelineResult runWrite() {
        PCollection writtenRows = (PCollection)((PCollection)((PCollection)((PCollection)this.pipelineWrite.apply((PTransform)GenerateSequence.from((long)0L).to((long)numberOfRows))).apply((PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply((PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, "write_time")))).apply((PTransform)SingleStoreIO.write().withDataSourceConfiguration(dataSourceConfiguration).withTable(tableName).withUserDataMapper((SingleStoreIO.UserDataMapper)new TestHelper.TestUserDataMapper()));
        PAssert.thatSingleton((PCollection)((PCollection)writtenRows.apply("Sum All", (PTransform)Sum.integersGlobally()))).isEqualTo((Object)numberOfRows);
        return this.pipelineWrite.run();
    }

    private PipelineResult runRead() {
        PCollection namesAndIds = (PCollection)((PCollection)this.pipelineRead.apply((PTransform)SingleStoreIO.read().withDataSourceConfiguration(dataSourceConfiguration).withTable(tableName).withRowMapper((SingleStoreIO.RowMapper)new TestHelper.TestRowMapper()))).apply((PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, "read_time")));
        this.testReadResult((PCollection<TestRow>)namesAndIds);
        return this.pipelineRead.run();
    }

    private PipelineResult runReadWithPartitions() {
        PCollection namesAndIds = (PCollection)((PCollection)this.pipelineReadWithPartitions.apply((PTransform)SingleStoreIO.readWithPartitions().withDataSourceConfiguration(dataSourceConfiguration).withTable(tableName).withRowMapper((SingleStoreIO.RowMapper)new TestHelper.TestRowMapper()))).apply((PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, "read_with_partitions_time")));
        this.testReadResult((PCollection<TestRow>)namesAndIds);
        return this.pipelineReadWithPartitions.run();
    }

    private void testReadResult(PCollection<TestRow> namesAndIds) {
        PAssert.thatSingleton((PCollection)((PCollection)namesAndIds.apply("Count All", Count.globally()))).isEqualTo((Object)numberOfRows);
        PCollection consolidatedHashcode = (PCollection)((PCollection)namesAndIds.apply((PTransform)ParDo.of((DoFn)new TestRow.SelectNameFn()))).apply("Hash row contents", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{TestRow.getExpectedHashForRowCount((int)numberOfRows)});
        PCollection frontOfList = (PCollection)namesAndIds.apply((PTransform)Top.smallest((int)500));
        Iterable expectedFrontOfList = TestRow.getExpectedValues((int)0, (int)500);
        PAssert.thatSingletonIterable((PCollection)frontOfList).containsInAnyOrder(expectedFrontOfList);
        PCollection backOfList = (PCollection)namesAndIds.apply((PTransform)Top.largest((int)500));
        Iterable expectedBackOfList = TestRow.getExpectedValues((int)(numberOfRows - 500), (int)numberOfRows);
        PAssert.thatSingletonIterable((PCollection)backOfList).containsInAnyOrder(expectedBackOfList);
    }
}

