/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.jdbc;

import java.io.PrintWriter;
import java.io.Serializable;
import java.io.StringWriter;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.ArrayList;
import javax.sql.DataSource;
import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.derby.drda.NetworkServerControl;
import org.apache.derby.jdbc.ClientDataSource;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JdbcIOTest
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(JdbcIOTest.class);
    private static NetworkServerControl derbyServer;
    private static ClientDataSource dataSource;
    private static int port;
    @Rule
    public final transient TestPipeline pipeline = TestPipeline.create();

    @BeforeClass
    public static void startDatabase() throws Exception {
        ServerSocket socket = new ServerSocket(0);
        port = socket.getLocalPort();
        socket.close();
        LOG.info("Starting Derby database on {}", (Object)port);
        System.setProperty("derby.stream.error.file", "target/derby.log");
        derbyServer = new NetworkServerControl(InetAddress.getByName("localhost"), port);
        StringWriter out = new StringWriter();
        derbyServer.start(new PrintWriter(out));
        boolean started = false;
        int count = 0;
        while (!started && count < 30) {
            if (out.toString().contains("started")) {
                started = true;
                continue;
            }
            ++count;
            Thread.sleep(500L);
            try {
                derbyServer.ping();
                started = true;
            }
            catch (Throwable throwable) {}
        }
        dataSource = new ClientDataSource();
        dataSource.setCreateDatabase("create");
        dataSource.setDatabaseName("target/beam");
        dataSource.setServerName("localhost");
        dataSource.setPortNumber(port);
        try (Connection connection = dataSource.getConnection();
             Statement statement = connection.createStatement();){
            statement.executeUpdate("create table BEAM(id INT, name VARCHAR(500))");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @AfterClass
    public static void shutDownDatabase() throws Exception {
        try (Connection connection = dataSource.getConnection();
             Statement statement = connection.createStatement();){
            statement.executeUpdate("drop table BEAM");
        }
        finally {
            if (derbyServer != null) {
                derbyServer.shutdown();
            }
        }
    }

    @Before
    public void initTable() throws Exception {
        try (Connection connection = dataSource.getConnection();){
            try (Statement statement = connection.createStatement();){
                statement.executeUpdate("delete from BEAM");
            }
            String[] scientists = new String[]{"Einstein", "Darwin", "Copernicus", "Pasteur", "Curie", "Faraday", "Newton", "Bohr", "Galilei", "Maxwell"};
            connection.setAutoCommit(false);
            try (PreparedStatement preparedStatement = connection.prepareStatement("insert into BEAM values (?,?)");){
                for (int i = 0; i < 1000; ++i) {
                    int index = i % scientists.length;
                    preparedStatement.clearParameters();
                    preparedStatement.setInt(1, i);
                    preparedStatement.setString(2, scientists[index]);
                    preparedStatement.executeUpdate();
                }
            }
            connection.commit();
        }
    }

    @Test
    public void testDataSourceConfigurationDataSource() throws Exception {
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((DataSource)dataSource);
        try (Connection conn = config.getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationDriverAndUrl() throws Exception {
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam"));
        try (Connection conn = config.getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationUsernameAndPassword() throws Exception {
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam")).withUsername("sa").withPassword("sa");
        try (Connection conn = config.getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationNullPassword() throws Exception {
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam")).withUsername("sa").withPassword(null);
        try (Connection conn = config.getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    public void testDataSourceConfigurationNullUsernameAndPassword() throws Exception {
        JdbcIO.DataSourceConfiguration config = JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam")).withUsername(null).withPassword(null);
        try (Connection conn = config.getConnection();){
            Assert.assertTrue((boolean)conn.isValid(0));
        }
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testRead() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)JdbcIO.read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((DataSource)dataSource)).withQuery("select name,id from BEAM").withRowMapper((JdbcIO.RowMapper)new JdbcIO.RowMapper<KV<String, Integer>>(){

            public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
                KV kv = KV.of((Object)resultSet.getString("name"), (Object)resultSet.getInt("id"));
                return kv;
            }
        }).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count All", (PTransform)Count.globally()))).isEqualTo((Object)1000L);
        PAssert.that((PCollection)((PCollection)output.apply("Count Scientist", (PTransform)Count.perKey()))).satisfies((SerializableFunction)new SerializableFunction<Iterable<KV<String, Long>>, Void>(){

            public Void apply(Iterable<KV<String, Long>> input) {
                for (KV<String, Long> element : input) {
                    Assert.assertEquals((String)((String)element.getKey()), (long)100L, (long)((Long)element.getValue()));
                }
                return null;
            }
        });
        this.pipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testReadWithSingleStringParameter() throws Exception {
        PCollection output = (PCollection)this.pipeline.apply((PTransform)JdbcIO.read().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((DataSource)dataSource)).withQuery("select name,id from BEAM where name = ?").withStatementPrepator(new JdbcIO.StatementPreparator(){

            public void setParameters(PreparedStatement preparedStatement) throws Exception {
                preparedStatement.setString(1, "Darwin");
            }
        }).withRowMapper((JdbcIO.RowMapper)new JdbcIO.RowMapper<KV<String, Integer>>(){

            public KV<String, Integer> mapRow(ResultSet resultSet) throws Exception {
                KV kv = KV.of((Object)resultSet.getString("name"), (Object)resultSet.getInt("id"));
                return kv;
            }
        }).withCoder((Coder)KvCoder.of((Coder)StringUtf8Coder.of(), (Coder)BigEndianIntegerCoder.of())));
        PAssert.thatSingleton((PCollection)((PCollection)output.apply("Count One Scientist", (PTransform)Count.globally()))).isEqualTo((Object)100L);
        this.pipeline.run();
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWrite() throws Exception {
        ArrayList<KV> data = new ArrayList<KV>();
        for (int i = 0; i < 1000; ++i) {
            KV kv = KV.of((Object)i, (Object)"Test");
            data.add(kv);
        }
        ((PCollection)this.pipeline.apply((PTransform)Create.of(data))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam"))).withStatement("insert into BEAM values(?, ?)").withPreparedStatementSetter((JdbcIO.PreparedStatementSetter)new JdbcIO.PreparedStatementSetter<KV<Integer, String>>(){

            public void setParameters(KV<Integer, String> element, PreparedStatement statement) throws Exception {
                statement.setInt(1, (Integer)element.getKey());
                statement.setString(2, (String)element.getValue());
            }
        }));
        this.pipeline.run();
        try (Connection connection = dataSource.getConnection();
             Statement statement = connection.createStatement();
             ResultSet resultSet = statement.executeQuery("select count(*) from BEAM");){
            resultSet.next();
            int count = resultSet.getInt(1);
            Assert.assertEquals((long)2000L, (long)count);
        }
    }

    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteWithEmptyPCollection() throws Exception {
        ((PCollection)this.pipeline.apply((PTransform)Create.of(new ArrayList()))).apply((PTransform)JdbcIO.write().withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create((String)"org.apache.derby.jdbc.ClientDriver", (String)("jdbc:derby://localhost:" + port + "/target/beam"))).withStatement("insert into BEAM values(?, ?)").withPreparedStatementSetter((JdbcIO.PreparedStatementSetter)new JdbcIO.PreparedStatementSetter<KV<Integer, String>>(){

            public void setParameters(KV<Integer, String> element, PreparedStatement statement) throws Exception {
                statement.setInt(1, (Integer)element.getKey());
                statement.setString(2, (String)element.getValue());
            }
        }));
        this.pipeline.run();
    }
}

