/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.singlestore;

import com.singlestore.jdbc.SingleStoreDataSource;
import java.io.Serializable;
import javax.sql.DataSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.SingleStoreIOTestPipelineOptions;
import org.apache.beam.sdk.io.singlestore.TestHelper;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadConfiguration;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadProvider;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteConfiguration;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.CombineFnBase;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class SingleStoreIOSchemaTransformIT {
    private static final String DATABASE_NAME = "SingleStoreIOIT";
    private static int numberOfRows;
    private static String tableName;
    private static String serverName;
    private static String username;
    private static String password;
    private static Integer port;
    private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;
    @Rule
    public TestPipeline pipelineWrite = TestPipeline.create();
    @Rule
    public TestPipeline pipelineRead = TestPipeline.create();
    @Rule
    public TestPipeline pipelineReadWithPartitions = TestPipeline.create();

    @BeforeClass
    public static void setup() {
        SingleStoreIOTestPipelineOptions options;
        try {
            options = (SingleStoreIOTestPipelineOptions)IOITHelper.readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
        }
        catch (IllegalArgumentException e) {
            options = null;
        }
        Assume.assumeNotNull((Object[])new Object[]{options});
        numberOfRows = options.getNumberOfRecords();
        serverName = options.getSingleStoreServerName();
        username = options.getSingleStoreUsername();
        password = options.getSingleStorePassword();
        port = options.getSingleStorePort();
        tableName = DatabaseTestHelper.getTestTableName((String)"IT");
        dataSourceConfiguration = SingleStoreIO.DataSourceConfiguration.create((String)(serverName + ":" + port)).withDatabase(DATABASE_NAME).withPassword(password).withUsername(username);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    @Category(value={NeedsRunner.class})
    public void testWriteThenRead() throws Exception {
        TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
        SingleStoreDataSource dataSource = new SingleStoreDataSource(String.format("jdbc:singlestore://%s:%d/%s?user=%s&password=%s&allowLocalInfile=TRUE", serverName, port, DATABASE_NAME, username, password));
        DatabaseTestHelper.createTable((DataSource)dataSource, (String)tableName);
        try {
            PipelineResult writeResult = this.runWrite();
            Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)writeResult.waitUntilFinish());
            PipelineResult readResult = this.runRead();
            Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)readResult.waitUntilFinish());
            PipelineResult readResultWithPartitions = this.runReadWithPartitions();
            Assert.assertEquals((Object)PipelineResult.State.DONE, (Object)readResultWithPartitions.waitUntilFinish());
        }
        finally {
            DatabaseTestHelper.deleteTable((DataSource)dataSource, (String)tableName);
        }
    }

    private PipelineResult runWrite() {
        SingleStoreSchemaTransformWriteProvider provider = new SingleStoreSchemaTransformWriteProvider();
        SingleStoreSchemaTransformWriteConfiguration configuration = SingleStoreSchemaTransformWriteConfiguration.builder().setDataSourceConfiguration(dataSourceConfiguration).setTable(tableName).setBatchSize(Integer.valueOf(100)).build();
        Row configurationRow = configuration.toBeamRow();
        SchemaTransform schemaTransform = provider.from(configurationRow);
        PTransform pCollectionRowTupleTransform = schemaTransform.buildTransform();
        Schema.Builder schemaBuilder = new Schema.Builder();
        schemaBuilder.addField("id", Schema.FieldType.INT32);
        schemaBuilder.addField("name", Schema.FieldType.STRING);
        Schema schema = schemaBuilder.build();
        PCollection rows = ((PCollection)((PCollection)((PCollection)this.pipelineWrite.apply((PTransform)GenerateSequence.from((long)0L).to((long)numberOfRows))).apply((PTransform)ParDo.of((DoFn)new TestRow.DeterministicallyConstructTestRowFn()))).apply("Convert TestRows to Rows", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptor.of(Row.class)).via((SerializableFunction & Serializable)testRow -> {
            Row.Builder rowBuilder = Row.withSchema((Schema)schema);
            rowBuilder.addValue((Object)testRow.id());
            rowBuilder.addValue((Object)testRow.name());
            return rowBuilder.build();
        }))).setRowSchema(schema);
        PCollectionRowTuple input = PCollectionRowTuple.of((String)"INPUT", (PCollection)rows);
        String tag = (String)provider.outputCollectionNames().get(0);
        PCollectionRowTuple output = (PCollectionRowTuple)input.apply(pCollectionRowTupleTransform);
        Assert.assertTrue((boolean)output.has(tag));
        PCollection writtenRows = (PCollection)output.get(tag).apply("Convert Rows to Integers", (PTransform)MapElements.into((TypeDescriptor)TypeDescriptor.of(Integer.class)).via((SerializableFunction & Serializable)row -> row.getInt32(0)));
        PAssert.thatSingleton((PCollection)((PCollection)writtenRows.apply("Sum All", (PTransform)Sum.integersGlobally()))).isEqualTo((Object)numberOfRows);
        return this.pipelineWrite.run();
    }

    private PipelineResult runRead() {
        SingleStoreSchemaTransformReadProvider provider = new SingleStoreSchemaTransformReadProvider();
        SingleStoreSchemaTransformReadConfiguration configuration = SingleStoreSchemaTransformReadConfiguration.builder().setDataSourceConfiguration(dataSourceConfiguration).setTable(tableName).setOutputParallelization(Boolean.valueOf(true)).build();
        Row configurationRow = configuration.toBeamRow();
        SchemaTransform schemaTransform = provider.from(configurationRow);
        PTransform pCollectionRowTupleTransform = schemaTransform.buildTransform();
        PCollectionRowTuple input = PCollectionRowTuple.empty((Pipeline)this.pipelineRead);
        String tag = (String)provider.outputCollectionNames().get(0);
        PCollectionRowTuple output = (PCollectionRowTuple)input.apply(pCollectionRowTupleTransform);
        Assert.assertTrue((boolean)output.has(tag));
        PCollection namesAndIds = (PCollection)output.get(tag).apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptor.of(TestRow.class)).via((SerializableFunction & Serializable)row -> TestRow.create((Integer)row.getInt32(0), (String)row.getString(1))));
        this.testReadResult((PCollection<TestRow>)namesAndIds);
        return this.pipelineRead.run();
    }

    private PipelineResult runReadWithPartitions() {
        SingleStoreSchemaTransformReadProvider provider = new SingleStoreSchemaTransformReadProvider();
        SingleStoreSchemaTransformReadConfiguration configuration = SingleStoreSchemaTransformReadConfiguration.builder().setDataSourceConfiguration(dataSourceConfiguration).setTable(tableName).setWithPartitions(Boolean.valueOf(true)).build();
        Row configurationRow = configuration.toBeamRow();
        SchemaTransform schemaTransform = provider.from(configurationRow);
        PTransform pCollectionRowTupleTransform = schemaTransform.buildTransform();
        PCollectionRowTuple input = PCollectionRowTuple.empty((Pipeline)this.pipelineReadWithPartitions);
        String tag = (String)provider.outputCollectionNames().get(0);
        PCollectionRowTuple output = (PCollectionRowTuple)input.apply(pCollectionRowTupleTransform);
        Assert.assertTrue((boolean)output.has(tag));
        PCollection namesAndIds = (PCollection)output.get(tag).apply((PTransform)MapElements.into((TypeDescriptor)TypeDescriptor.of(TestRow.class)).via((SerializableFunction & Serializable)row -> TestRow.create((Integer)row.getInt32(0), (String)row.getString(1))));
        this.testReadResult((PCollection<TestRow>)namesAndIds);
        return this.pipelineReadWithPartitions.run();
    }

    private void testReadResult(PCollection<TestRow> namesAndIds) {
        PAssert.thatSingleton((PCollection)((PCollection)namesAndIds.apply("Count All", Count.globally()))).isEqualTo((Object)numberOfRows);
        PCollection consolidatedHashcode = (PCollection)((PCollection)namesAndIds.apply((PTransform)ParDo.of((DoFn)new TestRow.SelectNameFn()))).apply("Hash row contents", (PTransform)Combine.globally((CombineFnBase.GlobalCombineFn)new HashingFn()).withoutDefaults());
        PAssert.that((PCollection)consolidatedHashcode).containsInAnyOrder((Object[])new String[]{TestRow.getExpectedHashForRowCount((int)numberOfRows)});
        PCollection frontOfList = (PCollection)namesAndIds.apply((PTransform)Top.smallest((int)500));
        Iterable expectedFrontOfList = TestRow.getExpectedValues((int)0, (int)500);
        PAssert.thatSingletonIterable((PCollection)frontOfList).containsInAnyOrder(expectedFrontOfList);
        PCollection backOfList = (PCollection)namesAndIds.apply((PTransform)Top.largest((int)500));
        Iterable expectedBackOfList = TestRow.getExpectedValues((int)(numberOfRows - 500), (int)numberOfRows);
        PAssert.thatSingletonIterable((PCollection)backOfList).containsInAnyOrder(expectedBackOfList);
    }
}

