package org.apache.beam.sdk.io.hadoop.format;

import com.google.cloud.Timestamp;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.ByteMonitor;
import org.apache.beam.sdk.testutils.metrics.CountMonitor;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT.class */
public class HadoopFormatIOIT {
    private static final String NAMESPACE = HadoopFormatIOIT.class.getName();
    private static PGSimpleDataSource dataSource;
    private static Integer numberOfRows;
    private static String tableName;
    private static SerializableConfiguration hadoopConfiguration;
    private static String bigQueryDataset;
    private static String bigQueryTable;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/beam/sdk/io/hadoop/format/HadoopFormatIOIT$ConstructDBOutputFormatRowFn.class */
    static class ConstructDBOutputFormatRowFn extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> {
        ConstructDBOutputFormatRowFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<TestRow, KV<TestRowDBWritable, NullWritable>>.ProcessContext processContext) {
            processContext.output(KV.of(new TestRowDBWritable(((TestRow) processContext.element()).id(), ((TestRow) processContext.element()).name()), NullWritable.get()));
        }
    }

    @BeforeClass
    public static void setUp() throws Exception {
        PostgresIOTestPipelineOptions readIOTestPipelineOptions = IOITHelper.readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
        dataSource = DatabaseTestHelper.getPostgresDataSource(readIOTestPipelineOptions);
        numberOfRows = readIOTestPipelineOptions.getNumberOfRecords();
        tableName = DatabaseTestHelper.getTestTableName("HadoopFormatIOIT");
        bigQueryDataset = readIOTestPipelineOptions.getBigQueryDataset();
        bigQueryTable = readIOTestPipelineOptions.getBigQueryTable();
        IOITHelper.executeWithRetry(HadoopFormatIOIT::createTable);
        setupHadoopConfiguration(readIOTestPipelineOptions);
    }

    private static void createTable() throws SQLException {
        DatabaseTestHelper.createTable(dataSource, tableName);
    }

    private static void setupHadoopConfiguration(PostgresIOTestPipelineOptions postgresIOTestPipelineOptions) {
        Configuration configuration = new Configuration();
        DBConfiguration.configureDB(configuration, "org.postgresql.Driver", DatabaseTestHelper.getPostgresDBUrl(postgresIOTestPipelineOptions), postgresIOTestPipelineOptions.getPostgresUsername(), postgresIOTestPipelineOptions.getPostgresPassword());
        configuration.set("mapreduce.jdbc.input.table.name", tableName);
        configuration.setStrings("mapreduce.jdbc.input.field.names", new String[]{"id", "name"});
        configuration.set("mapreduce.jdbc.input.orderby", "id ASC");
        configuration.setClass("mapreduce.jdbc.input.class", TestRowDBWritable.class, DBWritable.class);
        configuration.setClass("key.class", LongWritable.class, Object.class);
        configuration.setClass("value.class", TestRowDBWritable.class, Object.class);
        configuration.setClass("mapreduce.job.inputformat.class", DBInputFormat.class, InputFormat.class);
        configuration.set("mapreduce.jdbc.output.table.name", tableName);
        configuration.set("mapreduce.jdbc.output.field.count", "2");
        configuration.setStrings("mapreduce.jdbc.output.field.names", new String[]{"id", "name"});
        configuration.setClass("mapreduce.job.output.key.class", TestRowDBWritable.class, Object.class);
        configuration.setClass("mapreduce.job.output.value.class", NullWritable.class, Object.class);
        configuration.setClass("mapreduce.job.outputformat.class", DBOutputFormat.class, OutputFormat.class);
        configuration.set("mapreduce.job.id", String.valueOf(1));
        hadoopConfiguration = new SerializableConfiguration(configuration);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        IOITHelper.executeWithRetry(HadoopFormatIOIT::deleteTable);
    }

    private static void deleteTable() throws SQLException {
        DatabaseTestHelper.deleteTable(dataSource, tableName);
    }

    @Test
    public void writeAndReadUsingHadoopFormat() {
        this.writePipeline.apply("Generate sequence", GenerateSequence.from(0L).to(numberOfRows.intValue())).apply("Produce db rows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply("Prevent fusion before writing", Reshuffle.viaRandomKey()).apply("Collect write time", ParDo.of(new TimeMonitor(NAMESPACE, "write_time"))).apply("Count bytes", ParDo.of(new ByteMonitor(NAMESPACE, "byte_count"))).apply("Count items", ParDo.of(new CountMonitor(NAMESPACE, "item_count"))).apply("Construct rows for DBOutputFormat", ParDo.of(new ConstructDBOutputFormatRowFn())).apply("Write using Hadoop OutputFormat", HadoopFormatIO.write().withConfiguration(hadoopConfiguration.get()).withPartitioning().withExternalSynchronization(new HDFSSynchronization(this.tmpFolder.getRoot().getAbsolutePath())));
        PipelineResult run = this.writePipeline.run();
        run.waitUntilFinish();
        PAssert.thatSingleton(this.readPipeline.apply("Read using Hadoop InputFormat", HadoopFormatIO.read().withConfiguration(hadoopConfiguration.get())).apply("Collect read time", ParDo.of(new TimeMonitor(NAMESPACE, "read_time"))).apply("Get values only", Values.create()).apply("Values as string", ParDo.of(new TestRow.SelectNameFn())).apply("Calculate hashcode", Combine.globally(new HashingFn()))).isEqualTo(TestRow.getExpectedHashForRowCount(numberOfRows.intValue()));
        PipelineResult run2 = this.readPipeline.run();
        run2.waitUntilFinish();
        collectAndPublishMetrics(run, run2);
    }

    private void collectAndPublishMetrics(PipelineResult pipelineResult, PipelineResult pipelineResult2) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        Set<Function<MetricsReader, NamedTestResult>> readSuppliers = getReadSuppliers(uuid, timestamp);
        Set<Function<MetricsReader, NamedTestResult>> writeSuppliers = getWriteSuppliers(uuid, timestamp);
        IOITMetrics iOITMetrics = new IOITMetrics(readSuppliers, pipelineResult2, NAMESPACE, uuid, timestamp);
        IOITMetrics iOITMetrics2 = new IOITMetrics(writeSuppliers, pipelineResult, NAMESPACE, uuid, timestamp);
        iOITMetrics.publish(bigQueryDataset, bigQueryTable);
        iOITMetrics2.publish(bigQueryDataset, bigQueryTable);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "write_time", (metricsReader.getEndTimeMetric("write_time") - metricsReader.getStartTimeMetric("write_time")) / 1000.0d);
        });
        hashSet.add(metricsReader2 -> {
            return NamedTestResult.create(str, str2, "byte_count", metricsReader2.getCounterMetric("byte_count"));
        });
        hashSet.add(metricsReader3 -> {
            return NamedTestResult.create(str, str2, "item_count", metricsReader3.getCounterMetric("item_count"));
        });
        return hashSet;
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "read_time", (metricsReader.getEndTimeMetric("read_time") - metricsReader.getStartTimeMetric("read_time")) / 1000.0d);
        });
        return hashSet;
    }
}
