/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.hadoop.format;

import com.google.cloud.Timestamp;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import javax.sql.DataSource;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.io.hadoop.format.ExternalSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HDFSSynchronization;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.io.hadoop.format.TestRowDBWritable;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;

@RunWith(value=JUnit4.class)
public class HadoopFormatIOIT {
    private static final String NAMESPACE = HadoopFormatIOIT.class.getName();
    private static PGSimpleDataSource dataSource;
    private static Integer numberOfRows;
    private static String tableName;
    private static SerializableConfiguration hadoopConfiguration;
    private static String bigQueryDataset;
    private static String bigQueryTable;
    private static InfluxDBSettings settings;
    @Rule
    public TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public TestPipeline readPipeline = TestPipeline.create();
    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    @BeforeClass
    public static void setUp() throws Exception {
        PostgresIOTestPipelineOptions options = (PostgresIOTestPipelineOptions)IOITHelper.readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
        dataSource = DatabaseTestHelper.getPostgresDataSource((PostgresIOTestPipelineOptions)options);
        numberOfRows = options.getNumberOfRecords();
        tableName = DatabaseTestHelper.getTestTableName((String)"HadoopFormatIOIT");
        bigQueryDataset = options.getBigQueryDataset();
        bigQueryTable = options.getBigQueryTable();
        settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
        IOITHelper.executeWithRetry(HadoopFormatIOIT::createTable);
        HadoopFormatIOIT.setupHadoopConfiguration(options);
    }

    private static void createTable() throws SQLException {
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)tableName);
    }

    private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions options) {
        Configuration conf = new Configuration();
        DBConfiguration.configureDB((Configuration)conf, (String)"org.postgresql.Driver", (String)DatabaseTestHelper.getPostgresDBUrl((PostgresIOTestPipelineOptions)options), (String)options.getPostgresUsername(), (String)options.getPostgresPassword());
        conf.set("mapreduce.jdbc.input.table.name", tableName);
        conf.setStrings("mapreduce.jdbc.input.field.names", new String[]{"id", "name"});
        conf.set("mapreduce.jdbc.input.orderby", "id ASC");
        conf.setClass("mapreduce.jdbc.input.class", TestRowDBWritable.class, DBWritable.class);
        conf.setClass("key.class", LongWritable.class, Object.class);
        conf.setClass("value.class", TestRowDBWritable.class, Object.class);
        conf.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class);
        conf.set("mapreduce.jdbc.output.table.name", tableName);
        conf.set("mapreduce.jdbc.output.field.count", "2");
        conf.setStrings("mapreduce.jdbc.output.field.names", new String[]{"id", "name"});
        conf.setClass("mapreduce.job.output.key.class", TestRowDBWritable.class, Object.class);
        conf.setClass("mapreduce.job.output.value.class", NullWritable.class, Object.class);
        conf.setClass("mapreduce.job.outputformat.class", DBOutputFormat.class, OutputFormat.class);
        conf.set("mapreduce.job.id", String.valueOf(1));
        hadoopConfiguration = new SerializableConfiguration(conf);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        IOITHelper.executeWithRetry(HadoopFormatIOIT::deleteTable);
    }

    private static void deleteTable() throws SQLException {
        DatabaseTestHelper.deleteTable((DataSource)dataSource, (String)tableName);
    }

    @Test
    public void writeAndReadUsingHadoopFormat() {
        ((PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.writePipeline.apply("Generate sequence", (PTransform)GenerateSequence.from((long)0L).to((long)numberOfRows.intValue()))).apply("Produce db rows", (PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("Prevent fusion before writing", (PTransform)Reshuffle.viaRandomKey())).apply("Collect write time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, "write_time")))).apply("Construct rows for DBOutputFormat", (PTransform)ParDo.of((DoFn)new ConstructDBOutputFormatRowFn()))).apply("Write using Hadoop OutputFormat", (PTransform)HadoopFormatIO.write().withConfiguration(hadoopConfiguration.get()).withPartitioning().withExternalSynchronization((ExternalSynchronization)new HDFSSynchronization(this.tmpFolder.getRoot().getAbsolutePath())));
        PipelineResult writeResult = this.writePipeline.run();
        writeResult.waitUntilFinish();
        PCollection consolidatedHashcode = (PCollection)((PCollection)((PCollection)((PCollection)((PCollection)this.readPipeline.apply("Read using Hadoop InputFormat", (PTransform)HadoopFormatIO.read().withConfiguration(hadoopConfiguration.get()))).apply("Collect read time", (PTransform)ParDo.of((DoFn)new TimeMonitor(NAMESPACE, "read_time")))).apply("Get values only", (PTransform)Values.create())).apply("Values as string", (PTransform)ParDo.of((DoFn)new TestRow.SelectNameFn()))).apply("Calculate hashcode", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()));
        PAssert.thatSingleton((PCollection)consolidatedHashcode).isEqualTo((Object)TestRow.getExpectedHashForRowCount((int)numberOfRows));
        PipelineResult readResult = this.readPipeline.run();
        readResult.waitUntilFinish();
        this.collectAndPublishMetrics(writeResult, readResult);
    }

    private void collectAndPublishMetrics(PipelineResult writeResult, PipelineResult readResult) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        Set<Function<MetricsReader, NamedTestResult>> readSuppliers = this.getReadSuppliers(uuid, timestamp);
        Set<Function<MetricsReader, NamedTestResult>> writeSuppliers = this.getWriteSuppliers(uuid, timestamp);
        IOITMetrics readMetrics = new IOITMetrics(readSuppliers, readResult, NAMESPACE, uuid, timestamp);
        IOITMetrics writeMetrics = new IOITMetrics(writeSuppliers, writeResult, NAMESPACE, uuid, timestamp);
        readMetrics.publish(bigQueryDataset, bigQueryTable);
        readMetrics.publishToInflux(settings);
        writeMetrics.publish(bigQueryDataset, bigQueryTable);
        writeMetrics.publishToInflux(settings);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(String uuid, String timestamp) {
        HashSet<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<Function<MetricsReader, NamedTestResult>>();
        suppliers.add(this.getTimeMetric(uuid, timestamp, "write_time"));
        suppliers.add(reader -> NamedTestResult.create((String)uuid, (String)timestamp, (String)"data_size", (double)((Long)DatabaseTestHelper.getPostgresTableSize((DataSource)dataSource, (String)tableName).orElseThrow(() -> new IllegalStateException("Unable to fetch table size"))).longValue()));
        return suppliers;
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(String uuid, String timestamp) {
        HashSet<Function<MetricsReader, NamedTestResult>> suppliers = new HashSet<Function<MetricsReader, NamedTestResult>>();
        suppliers.add(this.getTimeMetric(uuid, timestamp, "read_time"));
        return suppliers;
    }

    private Function<MetricsReader, NamedTestResult> getTimeMetric(String uuid, String timestamp, String metricName) {
        return reader -> {
            long startTime = reader.getStartTimeMetric(metricName);
            long endTime = reader.getEndTimeMetric(metricName);
            return NamedTestResult.create((String)uuid, (String)timestamp, (String)metricName, (double)((double)(endTime - startTime) / 1000.0));
        };
    }

    static class ConstructDBOutputFormatRowFn
    extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> {
        ConstructDBOutputFormatRowFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.output((Object)KV.of((Object)((Object)new TestRowDBWritable(((TestRow)c.element()).id(), ((TestRow)c.element()).name())), (Object)NullWritable.get()));
        }
    }
}

