package org.apache.beam.sdk.io.jdbc;

import com.google.cloud.Timestamp;
import java.lang.invoke.SerializedLambda;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.function.Function;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.PostgresIOTestPipelineOptions;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.jdbc.JdbcTestHelper;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.testutils.NamedTestResult;
import org.apache.beam.sdk.testutils.metrics.IOITMetrics;
import org.apache.beam.sdk.testutils.metrics.MetricsReader;
import org.apache.beam.sdk.testutils.metrics.TimeMonitor;
import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.postgresql.ds.PGSimpleDataSource;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/jdbc/JdbcIOIT.class */
public class JdbcIOIT {
    private static final int EXPECTED_ROW_COUNT = 1000;
    private static final String NAMESPACE = JdbcIOIT.class.getName();
    private static int numberOfRows;
    private static PGSimpleDataSource dataSource;
    private static String tableName;
    private static Long tableSize;
    private static InfluxDBSettings settings;

    @Rule
    public TestPipeline pipelineWrite = TestPipeline.create();

    @Rule
    public TestPipeline pipelineRead = TestPipeline.create();

    @BeforeClass
    public static void setup() throws Exception {
        PostgresIOTestPipelineOptions postgresIOTestPipelineOptions;
        try {
            postgresIOTestPipelineOptions = (PostgresIOTestPipelineOptions) IOITHelper.readIOTestPipelineOptions(PostgresIOTestPipelineOptions.class);
        } catch (IllegalArgumentException e) {
            postgresIOTestPipelineOptions = null;
        }
        Assume.assumeNotNull(new Object[]{postgresIOTestPipelineOptions});
        numberOfRows = postgresIOTestPipelineOptions.getNumberOfRecords().intValue();
        dataSource = DatabaseTestHelper.getPostgresDataSource(postgresIOTestPipelineOptions);
        tableName = DatabaseTestHelper.getTestTableName("IT");
        IOITHelper.executeWithRetry(JdbcIOIT::createTable);
        tableSize = (Long) DatabaseTestHelper.getPostgresTableSize(dataSource, tableName).orElse(0L);
        settings = InfluxDBSettings.builder().withHost(postgresIOTestPipelineOptions.getInfluxHost()).withDatabase(postgresIOTestPipelineOptions.getInfluxDatabase()).withMeasurement(postgresIOTestPipelineOptions.getInfluxMeasurement()).get();
    }

    private static void createTable() throws SQLException {
        DatabaseTestHelper.createTable(dataSource, tableName);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        IOITHelper.executeWithRetry(JdbcIOIT::deleteTable);
    }

    private static void deleteTable() throws SQLException {
        DatabaseTestHelper.deleteTable(dataSource, tableName);
    }

    @Test
    public void testWriteThenRead() {
        PipelineResult runWrite = runWrite();
        runWrite.waitUntilFinish();
        PipelineResult runRead = runRead();
        runRead.waitUntilFinish();
        gatherAndPublishMetrics(runWrite, runRead);
    }

    private void gatherAndPublishMetrics(PipelineResult pipelineResult, PipelineResult pipelineResult2) {
        String uuid = UUID.randomUUID().toString();
        String timestamp = Timestamp.now().toString();
        new IOITMetrics(getWriteMetricSuppliers(uuid, timestamp), pipelineResult, NAMESPACE, uuid, timestamp).publishToInflux(settings);
        new IOITMetrics(getReadMetricSuppliers(uuid, timestamp), pipelineResult2, NAMESPACE, uuid, timestamp).publishToInflux(settings);
    }

    private Set<Function<MetricsReader, NamedTestResult>> getWriteMetricSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        Optional postgresTableSize = DatabaseTestHelper.getPostgresTableSize(dataSource, tableName);
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "write_time", (metricsReader.getEndTimeMetric("write_time") - metricsReader.getStartTimeMetric("write_time")) / 1000.0d);
        });
        postgresTableSize.ifPresent(l -> {
            hashSet.add(metricsReader2 -> {
                return NamedTestResult.create(str, str2, "total_size", l.longValue() - tableSize.longValue());
            });
        });
        return hashSet;
    }

    private Set<Function<MetricsReader, NamedTestResult>> getReadMetricSuppliers(String str, String str2) {
        HashSet hashSet = new HashSet();
        hashSet.add(metricsReader -> {
            return NamedTestResult.create(str, str2, "read_time", (metricsReader.getEndTimeMetric("read_time") - metricsReader.getStartTimeMetric("read_time")) / 1000.0d);
        });
        return hashSet;
    }

    private PipelineResult runWrite() {
        this.pipelineWrite.apply(GenerateSequence.from(0L).to(numberOfRows)).apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply(ParDo.of(new TimeMonitor(NAMESPACE, "write_time"))).apply(JdbcIO.write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)).withStatement(String.format("insert into %s values(?, ?)", tableName)).withPreparedStatementSetter(new JdbcTestHelper.PrepareStatementFromTestRow()));
        return this.pipelineWrite.run();
    }

    private PipelineResult runRead() {
        PCollection apply = this.pipelineRead.apply(JdbcIO.read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create(dataSource)).withQuery(String.format("select name,id from %s;", tableName)).withRowMapper(new JdbcTestHelper.CreateTestRowOfNameAndId())).apply(ParDo.of(new TimeMonitor(NAMESPACE, "read_time")));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(Long.valueOf(numberOfRows));
        PAssert.that(apply.apply(ParDo.of(new TestRow.SelectNameFn())).apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{TestRow.getExpectedHashForRowCount(numberOfRows)});
        PAssert.thatSingletonIterable(apply.apply(Top.smallest(500))).containsInAnyOrder(TestRow.getExpectedValues(0, 500));
        PAssert.thatSingletonIterable(apply.apply(Top.largest(500))).containsInAnyOrder(TestRow.getExpectedValues(numberOfRows - 500, numberOfRows));
        return this.pipelineRead.run();
    }

    @Test
    public void testWriteWithAutosharding() throws Exception {
        String testTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
        DatabaseTestHelper.createTable(dataSource, testTableName);
        try {
            ArrayList testDataToWrite = DatabaseTestHelper.getTestDataToWrite(1000L);
            TestStream.Builder advanceWatermarkTo = TestStream.create(KvCoder.of(VarIntCoder.of(), StringUtf8Coder.of())).advanceWatermarkTo(Instant.now());
            Iterator it = testDataToWrite.iterator();
            while (it.hasNext()) {
                advanceWatermarkTo.addElements((KV) it.next(), new KV[0]);
            }
            this.pipelineWrite.apply(advanceWatermarkTo.advanceWatermarkToInfinity()).apply(JdbcIO.write().withDataSourceProviderFn(r2 -> {
                return dataSource;
            }).withStatement(String.format("insert into %s values(?, ?) returning *", tableName)).withAutoSharding().withPreparedStatementSetter((kv, preparedStatement) -> {
                preparedStatement.setInt(1, ((Integer) kv.getKey()).intValue());
                preparedStatement.setString(2, (String) kv.getValue());
            }));
            this.pipelineWrite.run().waitUntilFinish();
            runRead();
            DatabaseTestHelper.deleteTable(dataSource, testTableName);
        } catch (Throwable th) {
            DatabaseTestHelper.deleteTable(dataSource, testTableName);
            throw th;
        }
    }

    @Test
    public void testWriteWithWriteResults() throws Exception {
        String testTableName = DatabaseTestHelper.getTestTableName("UT_WRITE");
        DatabaseTestHelper.createTable(dataSource, testTableName);
        try {
            PCollection apply = this.pipelineWrite.apply(Create.of(DatabaseTestHelper.getTestDataToWrite(1000L))).apply(getJdbcWriteWithReturning(testTableName).withWriteResults(resultSet -> {
                return (resultSet == null || !resultSet.next()) ? new JdbcTestHelper.TestDto(0) : new JdbcTestHelper.TestDto(resultSet.getInt(1));
            }));
            apply.setCoder(JdbcTestHelper.TEST_DTO_CODER);
            ArrayList arrayList = new ArrayList();
            for (int i = 0; i < EXPECTED_ROW_COUNT; i++) {
                arrayList.add(new JdbcTestHelper.TestDto(i));
            }
            PAssert.that(apply).containsInAnyOrder(arrayList);
            this.pipelineWrite.run();
            DatabaseTestHelper.assertRowCount(dataSource, testTableName, EXPECTED_ROW_COUNT);
            DatabaseTestHelper.deleteTable(dataSource, testTableName);
        } catch (Throwable th) {
            DatabaseTestHelper.deleteTable(dataSource, testTableName);
            throw th;
        }
    }

    private static JdbcIO.Write<KV<Integer, String>> getJdbcWriteWithReturning(String str) {
        return JdbcIO.write().withDataSourceProviderFn(r2 -> {
            return dataSource;
        }).withStatement(String.format("insert into %s values(?, ?) returning *", str)).withPreparedStatementSetter((kv, preparedStatement) -> {
            preparedStatement.setInt(1, ((Integer) kv.getKey()).intValue());
            preparedStatement.setString(2, (String) kv.getValue());
        });
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1991987247:
                if (implMethodName.equals("lambda$testWriteWithAutosharding$967c4bc5$1")) {
                    z = false;
                    break;
                }
                break;
            case -1477452027:
                if (implMethodName.equals("lambda$getJdbcWriteWithReturning$b89d903$1")) {
                    z = 3;
                    break;
                }
                break;
            case -1171754905:
                if (implMethodName.equals("lambda$testWriteWithAutosharding$43268ee4$1")) {
                    z = 4;
                    break;
                }
                break;
            case -874091088:
                if (implMethodName.equals("lambda$testWriteWithWriteResults$14189935$1")) {
                    z = true;
                    break;
                }
                break;
            case -311717780:
                if (implMethodName.equals("lambda$getJdbcWriteWithReturning$b2864b66$1")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case JdbcTestHelper.TestDto.EMPTY_RESULT /* 0 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIO$PreparedStatementSetter") && serializedLambda.getFunctionalInterfaceMethodName().equals("setParameters") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/sql/PreparedStatement;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;Ljava/sql/PreparedStatement;)V")) {
                    return (kv, preparedStatement) -> {
                        preparedStatement.setInt(1, ((Integer) kv.getKey()).intValue());
                        preparedStatement.setString(2, (String) kv.getValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIO$RowMapper") && serializedLambda.getFunctionalInterfaceMethodName().equals("mapRow") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/sql/ResultSet;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIOIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/sql/ResultSet;)Lorg/apache/beam/sdk/io/jdbc/JdbcTestHelper$TestDto;")) {
                    return resultSet -> {
                        return (resultSet == null || !resultSet.next()) ? new JdbcTestHelper.TestDto(0) : new JdbcTestHelper.TestDto(resultSet.getInt(1));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIOIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Ljavax/sql/DataSource;")) {
                    return r2 -> {
                        return dataSource;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIO$PreparedStatementSetter") && serializedLambda.getFunctionalInterfaceMethodName().equals("setParameters") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;Ljava/sql/PreparedStatement;)V") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIOIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/KV;Ljava/sql/PreparedStatement;)V")) {
                    return (kv2, preparedStatement2) -> {
                        preparedStatement2.setInt(1, ((Integer) kv2.getKey()).intValue());
                        preparedStatement2.setString(2, (String) kv2.getValue());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/jdbc/JdbcIOIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Ljavax/sql/DataSource;")) {
                    return r22 -> {
                        return dataSource;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
