/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.jdbc;

import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.net.InetAddress;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.sql.DataSource;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.NetworkTestHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.io.jdbc.JdbcTestHelper;
import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.dbcp2.PoolingDataSource;
import org.apache.derby.drda.NetworkServerControl;
import org.apache.derby.jdbc.ClientDataSource;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class JdbcIOTest
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class);
    private static final int EXPECTED_ROW_COUNT = 1000;
    private static final String BACKOFF_TABLE = "UT_WRITE_BACKOFF";
    private static NetworkServerControl derbyServer;
    private static ClientDataSource dataSource;
    private static int port;
    private static String readTableName;
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public final transient ExpectedLogs expectedLogs = ExpectedLogs.none(JdbcIO.class);

    @BeforeClass
    public static void beforeClass() throws Exception {
        port = NetworkTestHelper.getAvailableLocalPort();
        LOG.info("Starting Derby database on {}", (Object)port);
        System.setProperty("derby.locks.waitTimeout", "2");
        System.setProperty("derby.stream.error.file", "target/derby.log");
        derbyServer = new NetworkServerControl(InetAddress.getByName("localhost"), port);
        StringWriter out = new StringWriter();
        derbyServer.start(new PrintWriter(out));
        boolean started = false;
        int count = 0;
        while (!started && count < 30) {
            if (out.toString().contains("started")) {
                started = true;
                continue;
            }
            ++count;
            Thread.sleep(500L);
            try {
                derbyServer.ping();
                started = true;
            }
            catch (Throwable throwable) {}
        }
        dataSource = new ClientDataSource();
        dataSource.setCreateDatabase("create");
        dataSource.setDatabaseName("target/beam");
        dataSource.setServerName("localhost");
        dataSource.setPortNumber(port);
        readTableName = DatabaseTestHelper.getTestTableName((String)"UT_READ");
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)readTableName);
        JdbcIOTest.addInitialData((DataSource)dataSource, readTableName);
    }

    @AfterClass
    public static void afterClass() throws Exception {
        try {
            DatabaseTestHelper.deleteTable((DataSource)dataSource, (String)readTableName);
        }
        finally {
            if (derbyServer != null) {
                derbyServer.shutdown();
            }
        }
    }

    @Test
    public void testDataSourceConfigurationDataSource() throws Exception {
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((DataSource)dataSource);
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationDataSourceWithoutPool() {
        Assert.assertTrue((boolean)(JdbcIO.DataSourceConfiguration.create((DataSource)dataSource).buildDatasource() instanceof ClientDataSource));
    }

    @Test
    public void testDataSourceConfigurationDataSourceWithPool() {
        Assert.assertTrue((boolean)(JdbcIO.PoolableDataSourceProvider.of((JdbcIO.DataSourceConfiguration)JdbcIO.DataSourceConfiguration.create((DataSource)dataSource)).apply(null) instanceof PoolingDataSource));
    }

    @Test
    public void testDataSourceConfigurationDriverAndUrl() throws Exception {
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam"));
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationUsernameAndPassword() throws Exception {
        String username = "sa";
        String password = "sa";
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam")).withUsername(username).withPassword(password);
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationNullPassword() throws Exception {
        String username = "sa";
        String password = null;
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam")).withUsername(username).withPassword(password);
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationNullUsernameAndPassword() throws Exception {
        String username = null;
        String password = null;
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam")).withUsername(username).withPassword(password);
        try (Connection conn = config.buildDatasource().getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    private static void addInitialData(DataSource dataSource, String tableName) throws SQLException {
        try (Connection connection = dataSource.getConnection();){
            connection.setAutoCommit(false);
            try (PreparedStatement preparedStatement = connection.prepareStatement(String.format("insert into %s values (?,?)", tableName));){
                for (int i = 0; i < 1000; ++i) {
                    preparedStatement.clearParameters();
                    preparedStatement.setInt(1, i);
                    preparedStatement.setString(2, TestRow.getNameForSeed((Integer)i));
                    preparedStatement.executeUpdate();
                }
            }
            connection.commit();
        }
    }

    @Test
    public void testRead() {
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.read().withFetchSize(12).withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((DataSource)dataSource)).withQuery("select name,id from " + readTableName).withRowMapper((JdbcIO.RowMapper)new JdbcTestHelper.CreateTestRowOfNameAndId()).withCoder((Coder)SerializableCoder.of(TestRow.class)));
        PAssert.thatSingleton((PCollection)((PCollection)rows.apply("Count All", Count.globally()))).isEqualTo((Object)1000L);
        Iterable expectedValues = TestRow.getExpectedValues((int)0, (int)1000);
        PAssert.that((PCollection)rows).containsInAnyOrder(expectedValues);
        this.pipeline.run();
    }

    @Test
    public void testReadWithSingleStringParameter() {
        PCollection rows = (PCollection)this.pipeline.apply((PTransform)JdbcIO.read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((DataSource)dataSource)).withQuery(String.format("select name,id from %s where name = ?", readTableName)).withStatementPreparator((JdbcIO.StatementPreparator & Serializable)preparedStatement -> preparedStatement.setString(1, TestRow.getNameForSeed((Integer)1))).withRowMapper((JdbcIO.RowMapper)new JdbcTestHelper.CreateTestRowOfNameAndId()).withCoder((Coder)SerializableCoder.of(TestRow.class)));
        PAssert.thatSingleton((PCollection)((PCollection)rows.apply("Count All", Count.globally()))).isEqualTo((Object)1L);
        List<TestRow> expectedValues = Collections.singletonList(TestRow.fromSeed((Integer)1));
        PAssert.that((PCollection)rows).containsInAnyOrder(expectedValues);
        this.pipeline.run();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWrite() throws Exception {
        long rowsToAdd = 1000L;
        String tableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE");
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)tableName);
        try {
            ArrayList<KV<Integer, String>> data = JdbcIOTest.getDataToWrite(1000L);
            ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply(JdbcIOTest.getJdbcWrite(tableName));
            this.pipeline.run();
            JdbcIOTest.assertRowCount(tableName, 1000);
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)dataSource, (String)tableName);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testWriteWithResultsAndWaitOn() throws Exception {
        long rowsToAdd = 1000L;
        String firstTableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE");
        String secondTableName = DatabaseTestHelper.getTestTableName((String)"UT_WRITE_AFTER_WAIT");
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)firstTableName);
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)secondTableName);
        try {
            ArrayList<KV<Integer, String>> data = JdbcIOTest.getDataToWrite(1000L);
            PCollection dataCollection = (PCollection)this.pipeline.apply((PTransform)Create.of(data));
            PCollection rowsWritten = (PCollection)dataCollection.apply((PTransform)JdbcIOTest.getJdbcWrite(firstTableName).withResults());
            ((PCollection)dataCollection.apply((PTransform)Wait.on((PCollection[])new PCollection[]{rowsWritten}))).apply(JdbcIOTest.getJdbcWrite(secondTableName));
            this.pipeline.run();
            JdbcIOTest.assertRowCount(firstTableName, 1000);
            JdbcIOTest.assertRowCount(secondTableName, 1000);
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)dataSource, (String)firstTableName);
        }
    }

    private static JdbcIO.Write<KV<Integer, String>> getJdbcWrite(String tableName) {
        return JdbcIO.write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam"))).withStatement(String.format("insert into %s values(?, ?)", tableName)).withBatchSize(10L).withPreparedStatementSetter((JdbcIO.PreparedStatementSetter & Serializable)(element, statement) -> {
            statement.setInt(1, (Integer)element.getKey());
            statement.setString(2, (String)element.getValue());
        });
    }

    private static ArrayList<KV<Integer, String>> getDataToWrite(long rowsToAdd) {
        ArrayList<KV<Integer, String>> data = new ArrayList<KV<Integer, String>>();
        int i = 0;
        while ((long)i < rowsToAdd) {
            KV kv = KV.of((Object)i, (Object)"Test");
            data.add((KV<Integer, String>)kv);
            ++i;
        }
        return data;
    }

    private static void assertRowCount(String tableName, int expectedRowCount) throws SQLException {
        try (Connection connection = dataSource.getConnection();
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("select count(*) from " + tableName);){
            resultSet.next();
            int count = resultSet.getInt(1);
            Assert.assertEquals((long)expectedRowCount, (long)count);
        }
    }

    @Test
    public void testWriteWithBackoff() throws Exception {
        String tableName = DatabaseTestHelper.getTestTableName((String)BACKOFF_TABLE);
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)tableName);
        Connection connection = dataSource.getConnection();
        Statement lockStatement = connection.createStatement();
        lockStatement.execute("ALTER TABLE " + tableName + " LOCKSIZE TABLE");
        lockStatement.execute("LOCK TABLE " + tableName + " IN EXCLUSIVE MODE");
        connection.setAutoCommit(false);
        PreparedStatement insertStatement = connection.prepareStatement("insert into " + tableName + " values(?, ?)");
        insertStatement.setInt(1, 1);
        insertStatement.setString(2, "TEST");
        insertStatement.execute();
        ((PCollection)this.pipeline.apply((PTransform)Create.of(Collections.singletonList(KV.of((Object)1, (Object)"TEST"))))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam"))).withStatement(String.format("insert into %s values(?, ?)", tableName)).withRetryStrategy((JdbcIO.RetryStrategy & Serializable)e -> "XJ208".equals(e.getSQLState())).withPreparedStatementSetter((JdbcIO.PreparedStatementSetter & Serializable)(element, statement) -> {
            statement.setInt(1, (Integer)element.getKey());
            statement.setString(2, (String)element.getValue());
        }));
        Thread commitThread = new Thread(() -> {
            try {
                Thread.sleep(10000L);
                connection.commit();
            }
            catch (Exception exception) {
                // empty catch block
            }
        });
        commitThread.start();
        this.pipeline.run();
        commitThread.join();
        this.expectedLogs.verifyWarn("Deadlock detected, retrying");
        JdbcIOTest.assertRowCount(tableName, 2);
    }

    @After
    public void tearDown() {
        try {
            DatabaseTestHelper.deleteTable((DataSource)dataSource, (String)BACKOFF_TABLE);
        }
        catch (Exception exception) {
            // empty catch block
        }
    }

    @Test
    public void testWriteWithEmptyPCollection() {
        ((PCollection)this.pipeline.apply((PTransform)Create.empty((Coder)KvCoder.of((Coder)VarIntCoder.of(), (Coder)StringUtf8Coder.of())))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam"))).withStatement("insert into BEAM values(?, ?)").withPreparedStatementSetter((JdbcIO.PreparedStatementSetter & Serializable)(element, statement) -> {
            statement.setInt(1, (Integer)element.getKey());
            statement.setString(2, (String)element.getValue());
        }));
        this.pipeline.run();
    }
}

