package org.apache.beam.sdk.extensions.sql.impl;

import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.TimeZone;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.meta.provider.ReadOnlyTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestBoundedTable;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestUnboundedTable;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.util.ReleaseInfo;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalciteSchema;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/impl/JdbcDriverTest.class */
public class JdbcDriverTest {
    public static final DateTime FIRST_DATE = new DateTime(1);
    private static final Schema BASIC_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT64).addNullableField("name", Schema.FieldType.STRING).build();
    private static final Schema COMPLEX_SCHEMA = Schema.builder().addNullableField("description", Schema.FieldType.STRING).addNullableField("nestedRow", Schema.FieldType.row(BASIC_SCHEMA)).build();
    private static final ReadOnlyTableProvider BOUNDED_TABLE = new ReadOnlyTableProvider("test", ImmutableMap.of("test", TestBoundedTable.of(new Object[]{Schema.FieldType.INT32, "id", Schema.FieldType.STRING, "name"}).addRows(new Object[]{1, "first"})));

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void before() throws Exception {
        Class.forName("org.apache.beam.sdk.extensions.sql.impl.JdbcDriver");
    }

    @Test
    public void testDriverManager_getDriver() throws Exception {
        Assert.assertTrue(DriverManager.getDriver("jdbc:beam:") instanceof JdbcDriver);
    }

    @Test
    public void testDriverManager_simple() throws Exception {
        Assert.assertTrue(DriverManager.getConnection("jdbc:beam:").createStatement().execute("SELECT 1"));
    }

    @Test
    public void testDriverManager_defaultUserAgent() throws Exception {
        MatcherAssert.assertThat((String) CalciteSchema.from(DriverManager.getConnection("jdbc:beam:").getRootSchema().getSubSchema("beam")).schema.getPipelineOptions().get("userAgent"), Matchers.containsString("BeamSQL"));
    }

    @Test
    public void testDriverManager_hasUserAgent() throws Exception {
        MatcherAssert.assertThat((String) ((BeamCalciteSchema) DriverManager.getConnection("jdbc:beam:").getCurrentBeamSchema()).getPipelineOptions().get("userAgent"), Matchers.equalTo("BeamSQL/" + ReleaseInfo.getReleaseInfo().getVersion()));
    }

    @Test
    public void testDriverManager_setUserAgent() throws Exception {
        MatcherAssert.assertThat((String) CalciteSchema.from(DriverManager.getConnection("jdbc:beam:beam.userAgent=Secret Agent").getRootSchema().getSubSchema("beam")).schema.getPipelineOptions().get("userAgent"), Matchers.equalTo("Secret Agent"));
    }

    @Test
    public void testDriverManager_pipelineOptionsPlumbing() throws Exception {
        Map pipelineOptions = CalciteSchema.from(DriverManager.getConnection("jdbc:beam:beam.foo=baz;beam.foobizzle=mahshizzle;other=smother").getRootSchema().getSubSchema("beam")).schema.getPipelineOptions();
        MatcherAssert.assertThat((String) pipelineOptions.get("foo"), Matchers.equalTo("baz"));
        MatcherAssert.assertThat((String) pipelineOptions.get("foobizzle"), Matchers.equalTo("mahshizzle"));
        MatcherAssert.assertThat((String) pipelineOptions.get("other"), Matchers.nullValue());
    }

    @Test
    public void testDriverManager_parse() throws Exception {
        Assert.assertTrue(DriverManager.getConnection("jdbc:beam:").createStatement().execute("SELECT 'beam'"));
    }

    @Test
    public void testDriverManager_ddl() throws Exception {
        Connection connection = DriverManager.getConnection("jdbc:beam:");
        DatabaseMetaData metaData = connection.getMetaData();
        Assert.assertFalse(metaData.getTables(null, null, null, new String[]{"TABLE"}).next());
        Statement createStatement = connection.createStatement();
        Assert.assertEquals(0L, createStatement.executeUpdate("CREATE EXTERNAL TABLE test (id INTEGER) TYPE 'text'"));
        ResultSet tables = metaData.getTables(null, null, null, new String[]{"TABLE"});
        Assert.assertTrue(tables.next());
        Assert.assertEquals("test", tables.getString("TABLE_NAME"));
        Assert.assertFalse(tables.next());
        Assert.assertEquals(0L, createStatement.executeUpdate("DROP TABLE test"));
        Assert.assertFalse(metaData.getTables(null, null, null, new String[]{"TABLE"}).next());
    }

    @Test
    public void testSelectsFromExistingTable() throws Exception {
        TestTableProvider testTableProvider = new TestTableProvider();
        JdbcConnection connect = JdbcDriver.connect(testTableProvider, PipelineOptionsFactory.create());
        connect.createStatement().executeUpdate("CREATE EXTERNAL TABLE person (id BIGINT, name VARCHAR) TYPE 'test'");
        testTableProvider.addRows("person", new Row[]{row(1L, "aaa"), row(2L, "bbb")});
        MatcherAssert.assertThat((List) readResultSet(connect.createStatement().executeQuery("SELECT id, name FROM person")).stream().map(list -> {
            return (Row) list.stream().collect(Row.toRow(BASIC_SCHEMA));
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(new Row[]{row(1L, "aaa"), row(2L, "bbb")}));
    }

    @Test
    public void testTimestampWithDefaultTimezone() throws Exception {
        TestTableProvider testTableProvider = new TestTableProvider();
        JdbcConnection connect = JdbcDriver.connect(testTableProvider, PipelineOptionsFactory.create());
        Schema build = Schema.builder().addDateTimeField("ts").build();
        connect.createStatement().executeUpdate("CREATE EXTERNAL TABLE test (ts TIMESTAMP) TYPE 'test'");
        DateTime parseDateTime = ISODateTimeFormat.dateTimeParser().parseDateTime("2018-07-01T01:02:03Z");
        testTableProvider.addRows("test", new Row[]{Row.withSchema(build).addValue(parseDateTime).build()});
        ResultSet executeQuery = connect.createStatement().executeQuery(String.format("SELECT ts FROM test", new Object[0]));
        executeQuery.next();
        Timestamp timestamp = executeQuery.getTimestamp(1);
        MatcherAssert.assertThat(String.format("Wrote %s to a table, but got back %s", ISODateTimeFormat.basicDateTime().print(parseDateTime), ISODateTimeFormat.basicDateTime().print(timestamp.getTime())), Long.valueOf(timestamp.getTime()), Matchers.equalTo(Long.valueOf(parseDateTime.getMillis())));
    }

    @Test
    @Ignore("https://issues.apache.org/jira/browse/CALCITE-2394")
    public void testTimestampWithNonzeroTimezone() throws Exception {
        Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("Asia/Tokyo"), Locale.ROOT);
        TestTableProvider testTableProvider = new TestTableProvider();
        JdbcConnection connect = JdbcDriver.connect(testTableProvider, PipelineOptionsFactory.create());
        Schema build = Schema.builder().addDateTimeField("ts").build();
        connect.createStatement().executeUpdate("CREATE EXTERNAL TABLE test (ts TIMESTAMP) TYPE 'test'");
        DateTime parseDateTime = ISODateTimeFormat.dateTimeParser().parseDateTime("2018-07-01T01:02:03Z");
        testTableProvider.addRows("test", new Row[]{Row.withSchema(build).addValue(parseDateTime).build()});
        ResultSet executeQuery = connect.createStatement().executeQuery(String.format("SELECT ts FROM test", new Object[0]));
        executeQuery.next();
        Timestamp timestamp = executeQuery.getTimestamp(1, calendar);
        MatcherAssert.assertThat(String.format("Wrote %s to a table, but got back %s", ISODateTimeFormat.basicDateTime().print(parseDateTime), ISODateTimeFormat.basicDateTime().print(timestamp.getTime())), Long.valueOf(timestamp.getTime()), Matchers.equalTo(Long.valueOf(parseDateTime.getMillis())));
    }

    @Test
    public void testTimestampWithZeroTimezone() throws Exception {
        Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"), Locale.ROOT);
        TestTableProvider testTableProvider = new TestTableProvider();
        JdbcConnection connect = JdbcDriver.connect(testTableProvider, PipelineOptionsFactory.create());
        Schema build = Schema.builder().addDateTimeField("ts").build();
        connect.createStatement().executeUpdate("CREATE EXTERNAL TABLE test (ts TIMESTAMP) TYPE 'test'");
        DateTime parseDateTime = ISODateTimeFormat.dateTimeParser().parseDateTime("2018-07-01T01:02:03Z");
        testTableProvider.addRows("test", new Row[]{Row.withSchema(build).addValue(parseDateTime).build()});
        ResultSet executeQuery = connect.createStatement().executeQuery(String.format("SELECT ts FROM test", new Object[0]));
        executeQuery.next();
        Timestamp timestamp = executeQuery.getTimestamp(1, calendar);
        MatcherAssert.assertThat(String.format("Wrote %s to a table, but got back %s", ISODateTimeFormat.basicDateTime().print(parseDateTime), ISODateTimeFormat.basicDateTime().print(timestamp.getTime())), Long.valueOf(timestamp.getTime()), Matchers.equalTo(Long.valueOf(parseDateTime.getMillis())));
    }

    @Test
    public void testSelectsFromExistingComplexTable() throws Exception {
        TestTableProvider testTableProvider = new TestTableProvider();
        JdbcConnection connect = JdbcDriver.connect(testTableProvider, PipelineOptionsFactory.create());
        connect.createStatement().executeUpdate("CREATE EXTERNAL TABLE person ( \ndescription VARCHAR, \nnestedRow ROW< \n              id BIGINT, \n              name VARCHAR> \n) \nTYPE 'test'");
        testTableProvider.addRows("person", new Row[]{row(COMPLEX_SCHEMA, "description1", row(1L, "aaa")), row(COMPLEX_SCHEMA, "description2", row(2L, "bbb"))});
        MatcherAssert.assertThat((List) readResultSet(connect.createStatement().executeQuery("SELECT person.nestedRow.id, person.nestedRow.name FROM person")).stream().map(list -> {
            return (Row) list.stream().collect(Row.toRow(BASIC_SCHEMA));
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(new Row[]{row(1L, "aaa"), row(2L, "bbb")}));
    }

    @Test
    public void testInsertIntoCreatedTable() throws Exception {
        TestTableProvider testTableProvider = new TestTableProvider();
        JdbcConnection connect = JdbcDriver.connect(testTableProvider, PipelineOptionsFactory.create());
        connect.createStatement().executeUpdate("CREATE EXTERNAL TABLE person (id BIGINT, name VARCHAR) TYPE 'test'");
        connect.createStatement().executeUpdate("CREATE EXTERNAL TABLE person_src (id BIGINT, name VARCHAR) TYPE 'test'");
        testTableProvider.addRows("person_src", new Row[]{row(1L, "aaa"), row(2L, "bbb")});
        connect.createStatement().execute("INSERT INTO person SELECT id, name FROM person_src");
        MatcherAssert.assertThat((List) readResultSet(connect.createStatement().executeQuery("SELECT id, name FROM person")).stream().map(list -> {
            return (Row) list.stream().collect(Row.toRow(BASIC_SCHEMA));
        }).collect(Collectors.toList()), Matchers.containsInAnyOrder(new Row[]{row(1L, "aaa"), row(2L, "bbb")}));
    }

    @Test
    public void testInternalConnect_boundedTable() throws Exception {
        ResultSet executeQuery = JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()).createStatement().executeQuery("SELECT * FROM test");
        Assert.assertTrue(executeQuery.next());
        Assert.assertEquals(1L, executeQuery.getInt("id"));
        Assert.assertEquals("first", executeQuery.getString("name"));
        Assert.assertFalse(executeQuery.next());
    }

    @Test
    public void testInternalConnect_bounded_limit() throws Exception {
        Statement createStatement = JdbcDriver.connect(new ReadOnlyTableProvider("test", ImmutableMap.of("test", TestBoundedTable.of(new Object[]{Schema.FieldType.INT32, "id", Schema.FieldType.STRING, "name"}).addRows(new Object[]{1, "first"}).addRows(new Object[]{1, "second first"}).addRows(new Object[]{2, "second"}))), PipelineOptionsFactory.create()).createStatement();
        ResultSet executeQuery = createStatement.executeQuery("SELECT * FROM test LIMIT 5");
        Assert.assertTrue(executeQuery.next());
        Assert.assertTrue(executeQuery.next());
        Assert.assertTrue(executeQuery.next());
        Assert.assertFalse(executeQuery.next());
        Assert.assertFalse(executeQuery.next());
        ResultSet executeQuery2 = createStatement.executeQuery("SELECT * FROM test LIMIT 1");
        Assert.assertTrue(executeQuery2.next());
        Assert.assertFalse(executeQuery2.next());
        ResultSet executeQuery3 = createStatement.executeQuery("SELECT * FROM test LIMIT 2");
        Assert.assertTrue(executeQuery3.next());
        Assert.assertTrue(executeQuery3.next());
        Assert.assertFalse(executeQuery3.next());
        ResultSet executeQuery4 = createStatement.executeQuery("SELECT * FROM test LIMIT 3");
        Assert.assertTrue(executeQuery4.next());
        Assert.assertTrue(executeQuery4.next());
        Assert.assertTrue(executeQuery4.next());
        Assert.assertFalse(executeQuery4.next());
    }

    @Test
    public void testInternalConnect_unbounded_limit() throws Exception {
        Statement createStatement = JdbcDriver.connect(new ReadOnlyTableProvider("test", ImmutableMap.of("test", TestUnboundedTable.of(new Object[]{Schema.FieldType.INT32, "order_id", Schema.FieldType.INT32, "site_id", Schema.FieldType.INT32, "price", Schema.FieldType.DATETIME, "order_time"}).timestampColumnIndex(3).addRows(Duration.ZERO, new Object[]{1, 1, 1, FIRST_DATE, 1, 2, 6, FIRST_DATE}))), PipelineOptionsFactory.create()).createStatement();
        ResultSet executeQuery = createStatement.executeQuery("SELECT * FROM test LIMIT 1");
        Assert.assertTrue(executeQuery.next());
        Assert.assertFalse(executeQuery.next());
        ResultSet executeQuery2 = createStatement.executeQuery("SELECT * FROM test LIMIT 2");
        Assert.assertTrue(executeQuery2.next());
        Assert.assertTrue(executeQuery2.next());
        Assert.assertFalse(executeQuery2.next());
    }

    private List<List<Object>> readResultSet(ResultSet resultSet) throws Exception {
        ArrayList arrayList = new ArrayList();
        while (resultSet.next()) {
            ArrayList arrayList2 = new ArrayList();
            for (int i = 0; i < resultSet.getMetaData().getColumnCount(); i++) {
                arrayList2.add(resultSet.getObject(i + 1));
            }
            arrayList.add(arrayList2);
        }
        return arrayList;
    }

    private Row row(Object... objArr) {
        return row(BASIC_SCHEMA, objArr);
    }

    private Row row(Schema schema, Object... objArr) {
        return Row.withSchema(schema).addValues(objArr).build();
    }

    @Test
    public void testInternalConnect_setDirectRunner() throws Exception {
        Statement createStatement = JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()).createStatement();
        Assert.assertEquals(0L, createStatement.executeUpdate("SET runner = direct"));
        Assert.assertTrue(createStatement.execute("SELECT * FROM test"));
    }

    @Test
    public void testInternalConnect_setBogusRunner() throws Exception {
        this.thrown.expectMessage("Unknown 'runner' specified 'bogus'");
        Statement createStatement = JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()).createStatement();
        Assert.assertEquals(0L, createStatement.executeUpdate("SET runner = bogus"));
        Assert.assertTrue(createStatement.execute("SELECT * FROM test"));
    }

    @Test
    public void testInternalConnect_resetAll() throws Exception {
        Statement createStatement = JdbcDriver.connect(BOUNDED_TABLE, PipelineOptionsFactory.create()).createStatement();
        Assert.assertEquals(0L, createStatement.executeUpdate("SET runner = bogus"));
        Assert.assertEquals(0L, createStatement.executeUpdate("RESET ALL"));
        Assert.assertTrue(createStatement.execute("SELECT * FROM test"));
    }

    @Test
    public void testInternalConnect_driverManagerDifferentProtocol() throws Exception {
        this.thrown.expect(SQLException.class);
        this.thrown.expectMessage("No suitable driver found");
        DriverManager.getConnection("jdbc:baaaaaad");
    }
}
