package org.apache.beam.sdk.io.cdap;

import com.google.cloud.Timestamp;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.cdap.CdapIO;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.KV;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.utility.DockerImageName;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/cdap/CdapIOIT.class */
public class CdapIOIT {
    private static final String NAMESPACE = CdapIOIT.class.getName();
    private static final String[] TEST_FIELD_NAMES = {"id", "name"};
    private static final String TEST_ORDER_BY = "id ASC";
    private static PGSimpleDataSource dataSource;
    private static Integer numberOfRows;
    private static String tableName;
    private static InfluxDBSettings settings;
    private static CdapIOITOptions options;
    private static PostgreSQLContainer postgreSQLContainer;

    @Rule
    public TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public TestPipeline readPipeline = TestPipeline.create();

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/beam/sdk/io/cdap/CdapIOIT$CdapIOITOptions.class */
    public interface CdapIOITOptions extends PostgresIOTestPipelineOptions {
        @Description("Whether to use testcontainers")
        @Default.Boolean(false)
        Boolean isWithTestcontainers();

        void setWithTestcontainers(Boolean bool);
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/cdap/CdapIOIT$ConstructDBOutputFormatRowFn.class */
    static class ConstructDBOutputFormatRowFn extends DoFn<TestRow, KV<TestRowDBWritable, NullWritable>> {
        ConstructDBOutputFormatRowFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<TestRow, KV<TestRowDBWritable, NullWritable>>.ProcessContext processContext) {
            processContext.output(KV.of(new TestRowDBWritable(((TestRow) processContext.element()).id(), ((TestRow) processContext.element()).name()), NullWritable.get()));
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        options = IOITHelper.readIOTestPipelineOptions(CdapIOITOptions.class);
        if (options.isWithTestcontainers().booleanValue()) {
            setPostgresContainer();
        }
        dataSource = DatabaseTestHelper.getPostgresDataSource(options);
        numberOfRows = options.getNumberOfRecords();
        tableName = DatabaseTestHelper.getTestTableName("CdapIOIT");
        if (!options.isWithTestcontainers().booleanValue()) {
            settings = InfluxDBSettings.builder().withHost(options.getInfluxHost()).withDatabase(options.getInfluxDatabase()).withMeasurement(options.getInfluxMeasurement()).get();
        }
        IOITHelper.executeWithRetry(CdapIOIT::createTable);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        IOITHelper.executeWithRetry(CdapIOIT::deleteTable);
        if (postgreSQLContainer != null) {
            postgreSQLContainer.stop();
        }
    }

    @Test
    public void testCdapIOReadsAndWritesCorrectlyInBatch() {
        this.writePipeline.apply("Generate sequence", GenerateSequence.from(0L).to(numberOfRows.intValue())).apply("Produce db rows", ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply("Prevent fusion before writing", Reshuffle.viaRandomKey()).apply("Collect write time", ParDo.of(new TimeMonitor(NAMESPACE, "write_time"))).apply("Construct rows for DBOutputFormat", ParDo.of(new ConstructDBOutputFormatRowFn())).apply("Write using CdapIO", writeToDB(getWriteTestParamsFromOptions(options)));
        PipelineResult run = this.writePipeline.run();
        run.waitUntilFinish();
        PAssert.thatSingleton(this.readPipeline.apply("Read using CdapIO", readFromDB(getReadTestParamsFromOptions(options))).apply("Collect read time", ParDo.of(new TimeMonitor(NAMESPACE, "read_time"))).apply("Get values only", Values.create()).apply("Values as string", ParDo.of(new TestRow.SelectNameFn())).apply("Calculate hashcode", Combine.globally(new HashingFn()))).isEqualTo(TestRow.getExpectedHashForRowCount(numberOfRows.intValue()));
        PipelineResult run2 = this.readPipeline.run();
        PipelineResult.State waitUntilFinish = run2.waitUntilFinish();
        if (!options.isWithTestcontainers().booleanValue()) {
            collectAndPublishMetrics(run, run2);
        }
        Assert.assertNotEquals(waitUntilFinish, PipelineResult.State.FAILED);
    }

    private CdapIO.Write<TestRowDBWritable, NullWritable> writeToDB(Map<String, Object> map) {
        return CdapIO.write().withCdapPlugin(Plugin.createBatch(DBBatchSink.class, DBOutputFormat.class, DBOutputFormatProvider.class)).withPluginConfig(new ConfigWrapper(DBConfig.class).withParams(map).build()).withKeyClass(TestRowDBWritable.class).withValueClass(NullWritable.class).withLocksDirPath(this.tmpFolder.getRoot().getAbsolutePath());
    }

    private CdapIO.Read<LongWritable, TestRowDBWritable> readFromDB(Map<String, Object> map) {
        return CdapIO.read().withCdapPlugin(Plugin.createBatch(DBBatchSource.class, DBInputFormat.class, DBInputFormatProvider.class)).withPluginConfig(new ConfigWrapper(DBConfig.class).withParams(map).build()).withKeyClass(LongWritable.class).withValueClass(TestRowDBWritable.class);
    }

    private Map<String, Object> getTestParamsFromOptions(CdapIOITOptions cdapIOITOptions) {
        HashMap hashMap = new HashMap();
        hashMap.put(DBConfig.DB_URL, DatabaseTestHelper.getPostgresDBUrl(cdapIOITOptions));
        hashMap.put(DBConfig.POSTGRES_USERNAME, cdapIOITOptions.getPostgresUsername());
        hashMap.put(DBConfig.POSTGRES_PASSWORD, cdapIOITOptions.getPostgresPassword());
        hashMap.put(DBConfig.FIELD_NAMES, StringUtils.arrayToString(TEST_FIELD_NAMES));
        hashMap.put(DBConfig.TABLE_NAME, tableName);
        hashMap.put(ConfigWrapperTest.REFERENCE_NAME_PARAM_NAME, ConfigWrapperTest.REFERENCE_NAME_PARAM_NAME);
        return hashMap;
    }

    private Map<String, Object> getReadTestParamsFromOptions(CdapIOITOptions cdapIOITOptions) {
        Map<String, Object> testParamsFromOptions = getTestParamsFromOptions(cdapIOITOptions);
        testParamsFromOptions.put(DBConfig.ORDER_BY, TEST_ORDER_BY);
        testParamsFromOptions.put(DBConfig.VALUE_CLASS_NAME, TestRowDBWritable.class.getName());
        return testParamsFromOptions;
    }

    private Map<String, Object> getWriteTestParamsFromOptions(CdapIOITOptions cdapIOITOptions) {
        Map<String, Object> testParamsFromOptions = getTestParamsFromOptions(cdapIOITOptions);
        testParamsFromOptions.put(DBConfig.FIELD_COUNT, String.valueOf(TEST_FIELD_NAMES.length));
        return testParamsFromOptions;
    }

    private static void setPostgresContainer() {
        postgreSQLContainer = new PostgreSQLContainer(DockerImageName.parse("postgres").withTag("latest")).withDatabaseName(options.getPostgresDatabaseName()).withUsername(options.getPostgresUsername()).withPassword(options.getPostgresPassword());
        postgreSQLContainer.start();
        options.setPostgresServerName(postgreSQLContainer.getContainerIpAddress());
        options.setPostgresPort(postgreSQLContainer.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT.intValue()));
        options.setPostgresSsl(false);
    }

    private static void createTable() throws SQLException {
        DatabaseTestHelper.createTable(dataSource, tableName);
    }

    private static void deleteTable() throws SQLException {
        DatabaseTestHelper.deleteTable(dataSource, tableName);
    }

    private void collectAndPublishMetrics(PipelineResult pipelineResult, PipelineResult pipelineResult2) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        Set<Function<MetricsReader, NamedTestResult>> readSuppliers = getReadSuppliers(uuid, timestamp);
        Set<Function<MetricsReader, NamedTestResult>> writeSuppliers = getWriteSuppliers(uuid, timestamp);
        IOITMetrics iOITMetrics = new IOITMetrics(readSuppliers, pipelineResult2, NAMESPACE, uuid, timestamp);
        IOITMetrics iOITMetrics2 = new IOITMetrics(writeSuppliers, pipelineResult, NAMESPACE, uuid, timestamp);
        iOITMetrics.publishToInflux(settings);
        iOITMetrics2.publishToInflux(settings);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(getTimeMetric(str, str2, "read_time"));
        return hashSet;
    }

    private Set<Function<MetricsReader, NamedTestResult>> getWriteSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(getTimeMetric(str, str2, "write_time"));
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "data_size", ((Long) DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElseThrow(() -> {
                return new IllegalStateException("Unable to fetch table size");
            })).longValue());
        });
        return hashSet;
    }

    private Function<MetricsReader, NamedTestResult> getTimeMetric(String str, String str2, String str3) {
        return metricsReader -> {
            return NamedTestResult.create(str, str2, str3, (metricsReader.getEndTimeMetric(str3) - metricsReader.getStartTimeMetric(str3)) / 1000.0d);
        };
    }
}
