package org.apache.beam.sdk.io.singlestore;

import java.math.BigDecimal;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT.class */
public class SingleStoreIODefaultMapperIT {
    private static final String DATABASE_NAME = "SingleStoreIOIT";
    private static int numberOfRows;
    private static String tableName;
    private static String serverName;
    private static String username;
    private static String password;
    private static Integer port;
    private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;
    private static Schema schema;

    @Rule
    public TestPipeline pipelineWrite = TestPipeline.create();

    @Rule
    public TestPipeline pipelineRead = TestPipeline.create();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT$CheckDoFn.class */
    public static class CheckDoFn extends DoFn<Row, Void> {
        private CheckDoFn() {
        }

        @DoFn.ProcessElement
        public void process(DoFn<Row, Void>.ProcessContext processContext) {
            Row row = (Row) processContext.element();
            Assert.assertNotNull(row);
            Assert.assertEquals(Boolean.TRUE, row.getBoolean(0));
            Assert.assertEquals((byte) 10, row.getByte(1));
            Assert.assertEquals((short) 10, row.getInt16(2));
            Assert.assertEquals(10, row.getInt32(3));
            Assert.assertEquals(10L, row.getInt64(4));
            Assert.assertEquals(Float.valueOf(10.1f), row.getFloat(5));
            Assert.assertEquals(Double.valueOf(10.1d), row.getDouble(6));
            Assert.assertEquals(new BigDecimal("10.10000"), row.getDecimal(7));
            Assert.assertEquals(0L, new DateTime("2022-01-01T10:10:10Z").compareTo(row.getDateTime(8)));
            Assert.assertEquals(0L, new DateTime("2022-01-01T00:00:00Z").compareTo(row.getDateTime(9)));
            Assert.assertEquals(0L, new DateTime("1970-01-01T10:10:10Z").compareTo(row.getDateTime(10)));
            Assert.assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), row.getBytes(11));
            Assert.assertArrayEquals("asd".getBytes(StandardCharsets.UTF_8), row.getBytes(12));
            Assert.assertArrayEquals("asd��������������".getBytes(StandardCharsets.UTF_8), row.getBytes(13));
            Assert.assertEquals("asd", row.getString(14));
            Assert.assertEquals("asd", row.getString(15));
            Assert.assertEquals("asd", row.getString(16));
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/singlestore/SingleStoreIODefaultMapperIT$ConstructRowFn.class */
    public static class ConstructRowFn extends DoFn<Long, Row> {
        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Row>.ProcessContext processContext) {
            Row.Builder withSchema = Row.withSchema(SingleStoreIODefaultMapperIT.schema);
            withSchema.addValue(Boolean.TRUE);
            withSchema.addValue((byte) 10);
            withSchema.addValue((short) 10);
            withSchema.addValue(10);
            withSchema.addValue(10L);
            withSchema.addValue(Float.valueOf(10.1f));
            withSchema.addValue(Double.valueOf(10.1d));
            withSchema.addValue(new BigDecimal("10.1"));
            withSchema.addValue(new DateTime("2022-01-01T10:10:10Z"));
            withSchema.addValue(new DateTime("2022-01-01T10:10:10Z"));
            withSchema.addValue(new DateTime("2022-01-01T10:10:10Z"));
            withSchema.addValue("asd".getBytes(StandardCharsets.UTF_8));
            withSchema.addValue("asd".getBytes(StandardCharsets.UTF_8));
            withSchema.addValue("asd".getBytes(StandardCharsets.UTF_8));
            withSchema.addValue("asd");
            withSchema.addValue("asd");
            withSchema.addValue("asd");
            processContext.output(withSchema.build());
        }
    }

    @BeforeClass
    public static void setup() {
        SingleStoreIOTestPipelineOptions singleStoreIOTestPipelineOptions;
        try {
            singleStoreIOTestPipelineOptions = (SingleStoreIOTestPipelineOptions) IOITHelper.readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
        } catch (IllegalArgumentException e) {
            singleStoreIOTestPipelineOptions = null;
        }
        Assume.assumeNotNull(new Object[]{singleStoreIOTestPipelineOptions});
        numberOfRows = singleStoreIOTestPipelineOptions.getNumberOfRecords().intValue();
        serverName = singleStoreIOTestPipelineOptions.getSingleStoreServerName();
        username = singleStoreIOTestPipelineOptions.getSingleStoreUsername();
        password = singleStoreIOTestPipelineOptions.getSingleStorePassword();
        port = singleStoreIOTestPipelineOptions.getSingleStorePort();
        tableName = DatabaseTestHelper.getTestTableName("IT");
        dataSourceConfiguration = SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port).withDatabase(DATABASE_NAME).withPassword(password).withUsername(username);
        generateSchema();
    }

    private static void generateSchema() {
        Schema.Builder builder = new Schema.Builder();
        builder.addField("c1", Schema.FieldType.BOOLEAN);
        builder.addField("c2", Schema.FieldType.BYTE);
        builder.addField("c3", Schema.FieldType.INT16);
        builder.addField("c4", Schema.FieldType.INT32);
        builder.addField("c5", Schema.FieldType.INT64);
        builder.addField("c6", Schema.FieldType.FLOAT);
        builder.addField("c7", Schema.FieldType.DOUBLE);
        builder.addField("c8", Schema.FieldType.DECIMAL);
        builder.addField("c9", Schema.FieldType.DATETIME);
        builder.addField("c10", Schema.FieldType.DATETIME);
        builder.addField("c11", Schema.FieldType.DATETIME);
        builder.addField("c12", Schema.FieldType.BYTES);
        builder.addField("c13", Schema.FieldType.BYTES);
        builder.addField("c14", Schema.FieldType.BYTES);
        builder.addField("c15", Schema.FieldType.STRING);
        builder.addField("c16", Schema.FieldType.STRING);
        builder.addField("c17", Schema.FieldType.STRING);
        schema = builder.build();
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteThenRead() throws Exception {
        TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
        createTable();
        try {
            Assert.assertEquals(PipelineResult.State.DONE, runWrite().waitUntilFinish());
            Assert.assertEquals(PipelineResult.State.DONE, runRead().waitUntilFinish());
            DatabaseTestHelper.deleteTable(dataSourceConfiguration.getDataSource(), tableName);
        } catch (Throwable th) {
            DatabaseTestHelper.deleteTable(dataSourceConfiguration.getDataSource(), tableName);
            throw th;
        }
    }

    private void createTable() throws SQLException {
        Connection connection = dataSourceConfiguration.getDataSource().getConnection();
        try {
            Statement createStatement = connection.createStatement();
            createStatement.executeUpdate("DROP TABLE IF EXISTS " + tableName);
            createStatement.executeUpdate("CREATE TABLE " + tableName + "(c1 BIT, c2 TINYINT, c3 SMALLINT, c4 INTEGER, c5 BIGINT, c6 FLOAT, c7 DOUBLE, c8 DECIMAL(10, 5), c9 TIMESTAMP, c10 DATE, c11 TIME, c12 BLOB, c13 TINYBLOB, c14 BINARY(10), c15 MEDIUMTEXT, c16 TINYTEXT, c17 CHAR(10) )");
            connection.close();
        } catch (Throwable th) {
            connection.close();
            throw th;
        }
    }

    private PipelineResult runWrite() {
        this.pipelineWrite.apply(GenerateSequence.from(0L).to(numberOfRows)).apply(ParDo.of(new ConstructRowFn())).setRowSchema(schema).apply(SingleStoreIO.writeRows().withDataSourceConfiguration(dataSourceConfiguration).withTable(tableName));
        return this.pipelineWrite.run();
    }

    private PipelineResult runRead() {
        PCollection apply = this.pipelineRead.apply(SingleStoreIO.readRows().withDataSourceConfiguration(dataSourceConfiguration).withTable(tableName));
        PAssert.thatSingleton(apply.apply("Count All", Count.globally())).isEqualTo(Long.valueOf(numberOfRows));
        apply.apply(ParDo.of(new CheckDoFn())).setCoder(VoidCoder.of());
        return this.pipelineRead.run();
    }
}
