/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.jdbc;

import java.io.Serializable;
import java.math.BigDecimal;
import java.nio.charset.Charset;
import java.sql.Array;
import java.sql.Connection;
import java.sql.Date;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.TimeZone;
import java.util.logging.LogRecord;
import javax.sql.DataSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.jdbc.JdbcTestHelper;
import org.apache.beam.sdk.io.jdbc.JdbcUtil;
import org.apache.beam.sdk.io.jdbc.LogicalTypes;
import org.apache.beam.sdk.io.jdbc.RowWithSchema;
import org.apache.beam.sdk.io.jdbc.SchemaUtil;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.Select;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.commons.dbcp2.PoolingDataSource;
import org.hamcrest.Description;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.TypeSafeMatcher;
import org.joda.time.Chronology;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.LocalDate;
import org.joda.time.chrono.ISOChronology;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class JdbcIOTest
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class);
    private static final JdbcIO.DataSourceConfiguration DATA_SOURCE_CONFIGURATION = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.EmbeddedDriver", (String)"jdbc:derby:memory:testDB;create=true");
    private static final DataSource DATA_SOURCE = DATA_SOURCE_CONFIGURATION.buildDatasource();
    private static final int EXPECTED_ROW_COUNT = 1000;
    private static final String READ_TABLE_NAME = DatabaseTestHelper.getTestTableName((String)"UT_READ");
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(JdbcIO.class);
    @Rule
    public transient ExpectedException thrown = ExpectedException.none();

    @BeforeClass
    public static void beforeClass() throws Exception {
        System.setProperty("derby.locks.waitTimeout", "2");
        System.setProperty("derby.stream.error.file", "build/derby.log");
        DatabaseTestHelper.createTable((DataSource)DATA_SOURCE, (String)READ_TABLE_NAME);
        JdbcIOTest.addInitialData(DATA_SOURCE, READ_TABLE_NAME);
    }

    @Test
    public void testDataSourceConfigurationDataSourceWithoutPool() {
        MatcherAssert.assertThat((Object)DATA_SOURCE_CONFIGURATION.buildDatasource(), (Matcher)org.hamcrest.Matchers.not((Matcher)org.hamcrest.Matchers.instanceOf(PoolingDataSource.class)));
    }

    @Test
    public void testDataSourceConfigurationDataSourceWithPool() {
        Assert.assertTrue((boolean)(JdbcIO.PoolableDataSourceProvider.of((JdbcIO.DataSourceConfiguration)DATA_SOURCE_CONFIGURATION).apply(null) instanceof PoolingDataSource));
    }

    @Test
    public void testDataSourceConfigurationDriverAndUrl() throws Exception {
        try (Connection conn = DATA_SOURCE_CONFIGURATION.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationUsernameAndPassword() throws Exception {
        String username = "sa";
        String password = "sa";
        JdbcIO.DataSourceConfiguration config = DATA_SOURCE_CONFIGURATION.withUsername(username).withPassword(password);
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationNullPassword() throws Exception {
        String username = "sa";
        String password = null;
        JdbcIO.DataSourceConfiguration config = DATA_SOURCE_CONFIGURATION.withUsername(username).withPassword(password);
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationNullUsernameAndPassword() throws Exception {
        String username = null;
        String password = null;
        JdbcIO.DataSourceConfiguration config = DATA_SOURCE_CONFIGURATION.withUsername(username).withPassword(password);
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testSetConnectoinInitSqlFailWithDerbyDB() {
        String username = "sa";
        String password = "sa";
        JdbcIO.DataSourceConfiguration config = DATA_SOURCE_CONFIGURATION.withUsername(username).withPassword(password).withConnectionInitSqls((Collection)ImmutableList.of((Object)"SET innodb_lock_wait_timeout = 5"));
        Assert.assertThrows((String)"innodb_lock_wait_timeout", SQLException.class, () -> config.buildDatasource().getConnection());
    }

    private static void addInitialData(DataSource dataSource, String tableName) throws SQLException {
        try (Connection connection = dataSource.getConnection();){
            connection.setAutoCommit(false);
            try (PreparedStatement preparedStatement = connection.prepareStatement(String.format("insert into %s values (?,?)", tableName));){
                for (int i = 0; i < 1000; ++i) {
                    preparedStatement.clearParameters();
                    preparedStatement.setInt(1, i);
                    preparedStatement.setString(2, TestRow.getNameForSeed((Integer)i));
                    preparedStatement.executeUpdate();
                }
            }
            connection.commit();
        }
    }

    @Test
    public void testRead() {
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.read().withFetchSize(12).withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withQuery("select name,id from " + READ_TABLE_NAME).withRowMapper((JdbcIO.RowMapper)new JdbcTestHelper.CreateTestRowOfNameAndId()).withCoder((Coder)SerializableCoder.of(TestRow.class)));
        PAssert.thatSingleton((PCollection)((PCollection)rows.apply("Count All", Count.globally()))).isEqualTo((Object)1000L);
        Iterable expectedValues = TestRow.getExpectedValues((int)0, (int)1000);
        PAssert.that((PCollection)rows).containsInAnyOrder(expectedValues);
        this.pipeline.run();
    }

    @Test
    public void testReadWithSingleStringParameter() {
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.read().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withQuery(String.format("select name,id from %s where name = ?", READ_TABLE_NAME)).withStatementPreparator((JdbcIO.StatementPreparator & Serializable)preparedStatement -> preparedStatement.setString(1, TestRow.getNameForSeed((Integer)1))).withRowMapper((JdbcIO.RowMapper)new JdbcTestHelper.CreateTestRowOfNameAndId()).withCoder((Coder)SerializableCoder.of(TestRow.class)));
        PAssert.thatSingleton((PCollection)((PCollection)rows.apply("Count All", Count.globally()))).isEqualTo((Object)1L);
        List<TestRow> expectedValues = Collections.singletonList(TestRow.fromSeed((Integer)1));
        PAssert.that((PCollection)rows).containsInAnyOrder(expectedValues);
        this.pipeline.run();
    }

    @Test
    public void testReadRowsWithDataSourceConfiguration() {
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.readRows().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withQuery(String.format("select name,id from %s where name = ?", READ_TABLE_NAME)).withStatementPreparator((JdbcIO.StatementPreparator & Serializable)preparedStatement -> preparedStatement.setString(1, TestRow.getNameForSeed((Integer)1))));
        Schema expectedSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"NAME", (Schema.FieldType)LogicalTypes.variableLengthString((JDBCType)JDBCType.VARCHAR, (int)500)).withNullable(true), Schema.Field.of((String)"ID", (Schema.FieldType)Schema.FieldType.INT32).withNullable(true)});
        Assert.assertEquals((Object)expectedSchema, (Object)rows.getSchema());
        PCollection output = (PCollection)rows.apply((PTransform)Select.fieldNames((String[])new String[]{"NAME", "ID"}));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)ImmutableList.of((Object)Row.withSchema((Schema)expectedSchema).addValues(new Object[]{"Testval1", 1}).build()));
        this.pipeline.run();
    }

    @Test
    public void testReadRowsWithoutStatementPreparator() {
        SerializableFunction & Serializable dataSourceProvider = (SerializableFunction & Serializable)ignored -> DATA_SOURCE;
        String name = TestRow.getNameForSeed((Integer)1);
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.readRows().withDataSourceProviderFn((SerializableFunction)dataSourceProvider).withQuery(String.format("select name,id from %s where name = '%s'", READ_TABLE_NAME, name)));
        Schema expectedSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"NAME", (Schema.FieldType)LogicalTypes.variableLengthString((JDBCType)JDBCType.VARCHAR, (int)500)).withNullable(true), Schema.Field.of((String)"ID", (Schema.FieldType)Schema.FieldType.INT32).withNullable(true)});
        Assert.assertEquals((Object)expectedSchema, (Object)rows.getSchema());
        PCollection output = (PCollection)rows.apply((PTransform)Select.fieldNames((String[])new String[]{"NAME", "ID"}));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)ImmutableList.of((Object)Row.withSchema((Schema)expectedSchema).addValues(new Object[]{name, 1}).build()));
        this.pipeline.run();
    }

    @Test
    public void testReadWithSchema() {
        SerializableFunction & Serializable dataSourceProvider = (SerializableFunction & Serializable)ignored -> DATA_SOURCE;
        JdbcIO.RowMapper & Serializable rowMapper = (JdbcIO.RowMapper & Serializable)rs -> new RowWithSchema(rs.getString("NAME"), rs.getInt("ID"));
        this.pipeline.getSchemaRegistry().registerJavaBean(RowWithSchema.class);
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.read().withDataSourceProviderFn((SerializableFunction)dataSourceProvider).withQuery(String.format("select name,id from %s where name = ?", READ_TABLE_NAME)).withRowMapper((JdbcIO.RowMapper)rowMapper).withCoder((Coder)SerializableCoder.of(RowWithSchema.class)).withStatementPreparator((JdbcIO.StatementPreparator & Serializable)preparedStatement -> preparedStatement.setString(1, TestRow.getNameForSeed((Integer)1))));
        Schema expectedSchema = Schema.of((Schema.Field[])new Schema.Field[]{Schema.Field.of((String)"name", (Schema.FieldType)Schema.FieldType.STRING), Schema.Field.of((String)"id", (Schema.FieldType)Schema.FieldType.INT32)});
        Assert.assertEquals((Object)expectedSchema, (Object)rows.getSchema());
        PCollection output = (PCollection)rows.apply((PTransform)Select.fieldNames((String[])new String[]{"name", "id"}));
        PAssert.that((PCollection)output).containsInAnyOrder((Iterable)ImmutableList.of((Object)Row.withSchema((Schema)expectedSchema).addValues(new Object[]{"Testval1", 1}).build()));
        this.pipeline.run();
    }

    @Test
    public void testWrite() throws Exception {
        String tableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE");
        DatabaseTestHelper.createTable((DataSource)DATA_SOURCE, (String)tableName);
        try {
            ArrayList<KV<Integer, String>> data = JdbcIOTest.getDataToWrite(1000L);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply(JdbcIOTest.getJdbcWrite(tableName));
            this.pipeline.run();
            JdbcIOTest.assertRowCount(tableName, 1000);
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)DATA_SOURCE, (String)tableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteWithResultsAndWaitOn() throws Exception {
        String firstTableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE");
        String secondTableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE_AFTER_WAIT");
        DatabaseTestHelper.createTable((DataSource)DATA_SOURCE, (String)firstTableName);
        DatabaseTestHelper.createTable((DataSource)DATA_SOURCE, (String)secondTableName);
        try {
            ArrayList<KV<Integer, String>> data = JdbcIOTest.getDataToWrite(1000L);
            PCollection dataCollection = (PCollection)this.pipeline.apply((PTransform)Create.of(data));
            PCollection rowsWritten = (PCollection)dataCollection.apply((PTransform)JdbcIOTest.getJdbcWrite(firstTableName).withResults());
            ((PCollection)dataCollection.apply((PTransform)Wait.on((PCollection[])new PCollection[]{rowsWritten}))).apply(JdbcIOTest.getJdbcWrite(secondTableName));
            this.pipeline.run();
            JdbcIOTest.assertRowCount(firstTableName, 1000);
            JdbcIOTest.assertRowCount(secondTableName, 1000);
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)DATA_SOURCE, (String)firstTableName);
        }
    }

    private static JdbcIO.Write<KV<Integer, String>> getJdbcWrite(String tableName) {
        return JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withStatement(String.format("insert into %s values(?, ?)", tableName)).withBatchSize(10L).withPreparedStatementSetter((JdbcIO.PreparedStatementSetter & Serializable)(element, statement) -> {
            statement.setInt(1, (Integer)element.getKey());
            statement.setString(2, (String)element.getValue());
        });
    }

    private static ArrayList<KV<Integer, String>> getDataToWrite(long rowsToAdd) {
        ArrayList<KV<Integer, String>> data = new ArrayList<KV<Integer, String>>();
        int i = 0;
        while ((long)i < rowsToAdd) {
            KV kv = KV.of((Object)i, (Object)"Test");
            data.add((KV<Integer, String>)kv);
            ++i;
        }
        return data;
    }

    private static void assertRowCount(String tableName, int expectedRowCount) throws SQLException {
        try (Connection connection = DATA_SOURCE.getConnection();
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("select count(*) from " + tableName);){
            resultSet.next();
            int count = resultSet.getInt(1);
            Assert.assertEquals((long)expectedRowCount, (long)count);
        }
    }

    @Test
    public void testWriteWithBackoff() throws Exception {
        String tableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE_BACKOFF");
        DatabaseTestHelper.createTable((DataSource)DATA_SOURCE, (String)tableName);
        Connection connection = DATA_SOURCE.getConnection();
        Statement lockStatement = connection.createStatement();
        lockStatement.execute("ALTER TABLE " + tableName + " LOCKSIZE TABLE");
        lockStatement.execute("LOCK TABLE " + tableName + " IN EXCLUSIVE MODE");
        connection.setAutoCommit(false);
        PreparedStatement insertStatement = connection.prepareStatement("insert into " + tableName + " values(?, ?)");
        insertStatement.setInt(1, 1);
        insertStatement.setString(2, "TEST");
        insertStatement.execute();
        ((PCollection)this.pipeline.apply((PTransform)Create.of(Collections.singletonList(KV.of((Object)1, (Object)"TEST"))))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withStatement(String.format("insert into %s values(?, ?)", tableName)).withRetryStrategy((JdbcIO.RetryStrategy & Serializable)e -> "40XL1".equals(e.getSQLState())).withPreparedStatementSetter((JdbcIO.PreparedStatementSetter & Serializable)(element, statement) -> {
            statement.setInt(1, (Integer)element.getKey());
            statement.setString(2, (String)element.getValue());
        }));
        Thread commitThread = new Thread(() -> {
            try {
                Thread.sleep(10000L);
                connection.commit();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        commitThread.start();
        this.pipeline.run();
        commitThread.join();
        this.expectedLogs.verifyWarn("Deadlock detected, retrying");
        JdbcIOTest.assertRowCount(tableName, 2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteWithoutPreparedStatement() throws Exception {
        int rowsToAdd = 10;
        Schema.Builder schemaBuilder = Schema.builder();
        schemaBuilder.addField(Schema.Field.of((String)"column_boolean", (Schema.FieldType)Schema.FieldType.BOOLEAN));
        schemaBuilder.addField(Schema.Field.of((String)"column_string", (Schema.FieldType)Schema.FieldType.STRING));
        schemaBuilder.addField(Schema.Field.of((String)"column_int", (Schema.FieldType)Schema.FieldType.INT32));
        schemaBuilder.addField(Schema.Field.of((String)"column_long", (Schema.FieldType)Schema.FieldType.INT64));
        schemaBuilder.addField(Schema.Field.of((String)"column_float", (Schema.FieldType)Schema.FieldType.FLOAT));
        schemaBuilder.addField(Schema.Field.of((String)"column_double", (Schema.FieldType)Schema.FieldType.DOUBLE));
        schemaBuilder.addField(Schema.Field.of((String)"column_bigdecimal", (Schema.FieldType)Schema.FieldType.DECIMAL));
        schemaBuilder.addField(Schema.Field.of((String)"column_date", (Schema.FieldType)LogicalTypes.JDBC_DATE_TYPE));
        schemaBuilder.addField(Schema.Field.of((String)"column_time", (Schema.FieldType)LogicalTypes.JDBC_TIME_TYPE));
        schemaBuilder.addField(Schema.Field.of((String)"column_timestamptz", (Schema.FieldType)LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE));
        schemaBuilder.addField(Schema.Field.of((String)"column_timestamp", (Schema.FieldType)Schema.FieldType.DATETIME));
        schemaBuilder.addField(Schema.Field.of((String)"column_short", (Schema.FieldType)Schema.FieldType.INT16));
        Schema schema = schemaBuilder.build();
        String tableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE_PS");
        StringBuilder stmt = new StringBuilder("CREATE TABLE ");
        stmt.append(tableName);
        stmt.append(" (");
        stmt.append("column_boolean       BOOLEAN,");
        stmt.append("column_string        VARCHAR(254),");
        stmt.append("column_int           INTEGER,");
        stmt.append("column_long          BIGINT,");
        stmt.append("column_float         REAL,");
        stmt.append("column_double        DOUBLE PRECISION,");
        stmt.append("column_bigdecimal    DECIMAL(13,0),");
        stmt.append("column_date          DATE,");
        stmt.append("column_time          TIME,");
        stmt.append("column_timestamptz   TIMESTAMP,");
        stmt.append("column_timestamp     TIMESTAMP,");
        stmt.append("column_short         SMALLINT");
        stmt.append(" )");
        DatabaseTestHelper.createTableWithStatement((DataSource)DATA_SOURCE, (String)stmt.toString());
        try {
            ArrayList<Row> data = JdbcIOTest.getRowsToWrite(10L, schema);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).setRowSchema(schema).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withBatchSize(10L).withTable(tableName));
            this.pipeline.run();
            JdbcIOTest.assertRowCount(tableName, 10);
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)DATA_SOURCE, (String)tableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteWithoutPreparedStatementWithReadRows() throws Exception {
        SerializableFunction & Serializable dataSourceProvider = (SerializableFunction & Serializable)ignored -> DATA_SOURCE;
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.readRows().withDataSourceProviderFn((SerializableFunction)dataSourceProvider).withQuery(String.format("select name,id from %s where name = ?", READ_TABLE_NAME)).withStatementPreparator((JdbcIO.StatementPreparator & Serializable)preparedStatement -> preparedStatement.setString(1, TestRow.getNameForSeed((Integer)1))));
        String writeTableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE_PS_WITH_READ_ROWS");
        DatabaseTestHelper.createTableForRowWithSchema((DataSource)DATA_SOURCE, (String)writeTableName);
        try {
            rows.apply((PTransform)JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withBatchSize(10L).withTable(writeTableName));
            this.pipeline.run();
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)DATA_SOURCE, (String)writeTableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteWithoutPsWithNonNullableTableField() throws Exception {
        int rowsToAdd = 10;
        Schema.Builder schemaBuilder = Schema.builder();
        schemaBuilder.addField(Schema.Field.of((String)"column_boolean", (Schema.FieldType)Schema.FieldType.BOOLEAN));
        schemaBuilder.addField(Schema.Field.of((String)"column_string", (Schema.FieldType)Schema.FieldType.STRING));
        Schema schema = schemaBuilder.build();
        String tableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE");
        StringBuilder stmt = new StringBuilder("CREATE TABLE ");
        stmt.append(tableName);
        stmt.append(" (");
        stmt.append("column_boolean       BOOLEAN,");
        stmt.append("column_int           INTEGER NOT NULL");
        stmt.append(" )");
        DatabaseTestHelper.createTableWithStatement((DataSource)DATA_SOURCE, (String)stmt.toString());
        try {
            ArrayList<Row> data = JdbcIOTest.getRowsToWrite(10L, schema);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).setRowSchema(schema).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withBatchSize(10L).withTable(tableName));
            this.pipeline.run();
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)DATA_SOURCE, (String)tableName);
            this.thrown.expect(RuntimeException.class);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteWithoutPreparedStatementAndNonRowType() throws Exception {
        int rowsToAdd = 10;
        String tableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE_PS_NON_ROW");
        DatabaseTestHelper.createTableForRowWithSchema((DataSource)DATA_SOURCE, (String)tableName);
        try {
            ArrayList<RowWithSchema> data = JdbcIOTest.getRowsWithSchemaToWrite(10L);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withBatchSize(10L).withTable(tableName));
            this.pipeline.run();
            JdbcIOTest.assertRowCount(tableName, 10);
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)DATA_SOURCE, (String)tableName);
        }
    }

    @Test
    public void testGetPreparedStatementSetCaller() throws Exception {
        Schema schema = Schema.builder().addField("bigint_col", Schema.FieldType.INT64).addField("binary_col", Schema.FieldType.BYTES).addField("bit_col", Schema.FieldType.BOOLEAN).addField("char_col", Schema.FieldType.STRING).addField("decimal_col", Schema.FieldType.DECIMAL).addField("double_col", Schema.FieldType.DOUBLE).addField("float_col", Schema.FieldType.FLOAT).addField("integer_col", Schema.FieldType.INT32).addField("datetime_col", Schema.FieldType.DATETIME).addField("int16_col", Schema.FieldType.INT16).addField("byte_col", Schema.FieldType.BYTE).build();
        Row row = Row.withSchema((Schema)schema).addValues(new Object[]{42L, "binary".getBytes(Charset.forName("UTF-8")), true, "char", BigDecimal.valueOf(25L), 20.5, Float.valueOf(15.5f), 10, new DateTime(), (short)5, Byte.parseByte("1", 2)}).build();
        PreparedStatement psMocked = (PreparedStatement)Mockito.mock(PreparedStatement.class);
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.INT64).set(row, psMocked, 0, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(0), (Integer)0));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.BYTES).set(row, psMocked, 1, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(1), (Integer)1));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.BOOLEAN).set(row, psMocked, 2, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(2), (Integer)2));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.STRING).set(row, psMocked, 3, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(3), (Integer)3));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.DECIMAL).set(row, psMocked, 4, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(4), (Integer)4));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.DOUBLE).set(row, psMocked, 5, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(5), (Integer)5));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.FLOAT).set(row, psMocked, 6, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(6), (Integer)6));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.INT32).set(row, psMocked, 7, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(7), (Integer)7));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.DATETIME).set(row, psMocked, 8, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(8), (Integer)8));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.INT16).set(row, psMocked, 9, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(9), (Integer)9));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.BYTE).set(row, psMocked, 10, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(10), (Integer)10));
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setLong(1, 42L);
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setBytes(2, "binary".getBytes(Charset.forName("UTF-8")));
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setBoolean(3, true);
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setString(4, "char");
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setBigDecimal(5, BigDecimal.valueOf(25L));
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setDouble(6, 20.5);
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setFloat(7, 15.5f);
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setInt(8, 10);
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setTimestamp(9, new Timestamp(row.getDateTime("datetime_col").getMillis()));
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setInt(10, 5);
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setByte(11, Byte.parseByte("1", 2));
    }

    @Test
    public void testGetPreparedStatementSetCallerForLogicalTypes() throws Exception {
        Schema schema = Schema.builder().addField("logical_date_col", LogicalTypes.JDBC_DATE_TYPE).addField("logical_time_col", LogicalTypes.JDBC_TIME_TYPE).addField("logical_time_with_tz_col", LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE).build();
        long epochMilli = 1558719710000L;
        DateTime dateTime = new DateTime(epochMilli, (Chronology)ISOChronology.getInstanceUTC());
        DateTime time = new DateTime(34567000L, (Chronology)ISOChronology.getInstanceUTC());
        Row row = Row.withSchema((Schema)schema).addValues(new Object[]{dateTime.withTimeAtStartOfDay(), time, dateTime}).build();
        PreparedStatement psMocked = (PreparedStatement)Mockito.mock(PreparedStatement.class);
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)LogicalTypes.JDBC_DATE_TYPE).set(row, psMocked, 0, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(0), (Integer)0));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)LogicalTypes.JDBC_TIME_TYPE).set(row, psMocked, 1, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(1), (Integer)1));
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE).set(row, psMocked, 2, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(2), (Integer)2));
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setDate(1, new Date(row.getDateTime(0).getMillis()));
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setTime(2, new Time(row.getDateTime(1).getMillis()));
        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("UTC"));
        cal.setTimeInMillis(epochMilli);
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setTimestamp(3, new Timestamp(cal.getTime().getTime()), cal);
    }

    @Test
    public void testGetPreparedStatementSetCallerForArray() throws Exception {
        Schema schema = Schema.builder().addField("string_array_col", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).build();
        List<String> stringList = Arrays.asList("string 1", "string 2");
        Row row = Row.withSchema((Schema)schema).addValues(new Object[]{stringList}).build();
        PreparedStatement psMocked = (PreparedStatement)Mockito.mock(PreparedStatement.class);
        Connection connectionMocked = (Connection)Mockito.mock(Connection.class);
        Array arrayMocked = (Array)Mockito.mock(Array.class);
        Mockito.when((Object)psMocked.getConnection()).thenReturn((Object)connectionMocked);
        Mockito.when((Object)connectionMocked.createArrayOf(ArgumentMatchers.anyString(), (Object[])Matchers.any())).thenReturn((Object)arrayMocked);
        JdbcUtil.getPreparedStatementSetCaller((Schema.FieldType)Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).set(row, psMocked, 0, SchemaUtil.FieldWithIndex.of((Schema.Field)schema.getField(0), (Integer)0));
        ((PreparedStatement)Mockito.verify((Object)psMocked, (VerificationMode)Mockito.times((int)1))).setArray(1, arrayMocked);
    }

    private static ArrayList<Row> getRowsToWrite(long rowsToAdd, Schema schema) {
        ArrayList<Row> data = new ArrayList<Row>();
        int i = 0;
        while ((long)i < rowsToAdd) {
            ArrayList fields = new ArrayList();
            Row row = (Row)schema.getFields().stream().map(field -> JdbcIOTest.dummyFieldValue(field.getType())).collect(Row.toRow((Schema)schema));
            data.add(row);
            ++i;
        }
        return data;
    }

    private static ArrayList<RowWithSchema> getRowsWithSchemaToWrite(long rowsToAdd) {
        ArrayList<RowWithSchema> data = new ArrayList<RowWithSchema>();
        int i = 0;
        while ((long)i < rowsToAdd) {
            data.add(new RowWithSchema("Test", i));
            ++i;
        }
        return data;
    }

    private static Object dummyFieldValue(Schema.FieldType fieldType) {
        long epochMilli = 1558719710000L;
        if (fieldType.equals((Object)Schema.FieldType.STRING)) {
            return "string value";
        }
        if (fieldType.equals((Object)Schema.FieldType.INT32)) {
            return 100;
        }
        if (fieldType.equals((Object)Schema.FieldType.DOUBLE)) {
            return 20.5;
        }
        if (fieldType.equals((Object)Schema.FieldType.BOOLEAN)) {
            return Boolean.TRUE;
        }
        if (fieldType.equals((Object)Schema.FieldType.INT16)) {
            return (short)Short.MAX_VALUE;
        }
        if (fieldType.equals((Object)Schema.FieldType.INT64)) {
            return Long.MAX_VALUE;
        }
        if (fieldType.equals((Object)Schema.FieldType.FLOAT)) {
            return Float.valueOf(15.5f);
        }
        if (fieldType.equals((Object)Schema.FieldType.DECIMAL)) {
            return BigDecimal.ONE;
        }
        if (fieldType.equals((Object)LogicalTypes.JDBC_DATE_TYPE)) {
            return new DateTime(epochMilli, (Chronology)ISOChronology.getInstanceUTC()).withTimeAtStartOfDay();
        }
        if (fieldType.equals((Object)LogicalTypes.JDBC_TIME_TYPE)) {
            return new DateTime(epochMilli, (Chronology)ISOChronology.getInstanceUTC()).withDate(new LocalDate(0L));
        }
        if (fieldType.equals((Object)LogicalTypes.JDBC_TIMESTAMP_WITH_TIMEZONE_TYPE)) {
            return new DateTime(epochMilli, (Chronology)ISOChronology.getInstanceUTC());
        }
        if (fieldType.equals((Object)Schema.FieldType.DATETIME)) {
            return new DateTime(epochMilli, (Chronology)ISOChronology.getInstanceUTC());
        }
        return null;
    }

    @Test
    public void testWriteWithEmptyPCollection() {
        ((PCollection)this.pipeline.apply((PTransform)Create.empty((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withStatement("insert into BEAM values(?, ?)").withPreparedStatementSetter((JdbcIO.PreparedStatementSetter & Serializable)(element, statement) -> {
            statement.setInt(1, (Integer)element.getKey());
            statement.setString(2, (String)element.getValue());
        }));
        this.pipeline.run();
    }

    @Test
    public void testSerializationAndCachingOfPoolingDataSourceProvider() {
        SerializableFunction provider = JdbcIO.PoolableDataSourceProvider.of((JdbcIO.DataSourceConfiguration)DATA_SOURCE_CONFIGURATION);
        SerializableFunction deserializedProvider = (SerializableFunction)SerializableUtils.ensureSerializable((Serializable)provider);
        Assert.assertSame((Object)provider.apply(null), (Object)deserializedProvider.apply(null));
    }

    @Test
    public void testCustomFluentBackOffConfiguration() throws Exception {
        String tableName = DatabaseTestHelper.getTestTableName((String)"UT_FLUENT_BACKOFF");
        DatabaseTestHelper.createTable((DataSource)DATA_SOURCE, (String)tableName);
        Connection connection = DATA_SOURCE.getConnection();
        Statement lockStatement = connection.createStatement();
        lockStatement.execute("ALTER TABLE " + tableName + " LOCKSIZE TABLE");
        lockStatement.execute("LOCK TABLE " + tableName + " IN EXCLUSIVE MODE");
        connection.setAutoCommit(false);
        PreparedStatement insertStatement = connection.prepareStatement("insert into " + tableName + " values(?, ?)");
        insertStatement.setInt(1, 1);
        insertStatement.setString(2, "TEST");
        insertStatement.execute();
        ((PCollection)this.pipeline.apply((PTransform)Create.of(Collections.singletonList(KV.of((Object)1, (Object)"TEST"))))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(DATA_SOURCE_CONFIGURATION).withStatement(String.format("insert into %s values(?, ?)", tableName)).withRetryStrategy((JdbcIO.RetryStrategy & Serializable)e -> "40XL1".equals(e.getSQLState())).withRetryConfiguration(JdbcIO.RetryConfiguration.create((int)2, null, (Duration)Duration.standardSeconds((long)1L))).withPreparedStatementSetter((JdbcIO.PreparedStatementSetter & Serializable)(element, statement) -> {
            statement.setInt(1, (Integer)element.getKey());
            statement.setString(2, (String)element.getValue());
        }));
        Pipeline.PipelineExecutionException exception = (Pipeline.PipelineExecutionException)Assert.assertThrows(Pipeline.PipelineExecutionException.class, () -> this.pipeline.run().waitUntilFinish());
        connection.commit();
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)org.hamcrest.Matchers.containsString((String)"java.sql.BatchUpdateException: A lock could not be obtained within the time requested"));
        this.expectedLogs.verifyLogRecords((Matcher)new TypeSafeMatcher<Iterable<LogRecord>>(){

            public void describeTo(Description description) {
            }

            protected boolean matchesSafely(Iterable<LogRecord> logRecords) {
                int count = 0;
                for (LogRecord logRecord : logRecords) {
                    if (!logRecord.getMessage().contains("Deadlock detected, retrying")) continue;
                    ++count;
                }
                return count == 3;
            }
        });
        JdbcIOTest.assertRowCount(tableName, 1);
    }

    @Test
    public void testDefaultRetryStrategy() {
        JdbcIO.DefaultRetryStrategy strategy = new JdbcIO.DefaultRetryStrategy();
        Assert.assertTrue((boolean)strategy.apply(new SQLException("SQL deadlock", "40001")));
        Assert.assertTrue((boolean)strategy.apply(new SQLException("PostgreSQL deadlock", "40P01")));
        Assert.assertFalse((boolean)strategy.apply(new SQLException("Other code", "40X01")));
    }
}

