package org.apache.beam.sdk.io.singlestore;

import com.singlestore.jdbc.SingleStoreDataSource;
import java.lang.invoke.SerializedLambda;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.common.DatabaseTestHelper;
import org.apache.beam.sdk.io.common.HashingFn;
import org.apache.beam.sdk.io.common.IOITHelper;
import org.apache.beam.sdk.io.common.TestRow;
import org.apache.beam.sdk.io.singlestore.SingleStoreIO;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadConfiguration;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformReadProvider;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteConfiguration;
import org.apache.beam.sdk.io.singlestore.schematransform.SingleStoreSchemaTransformWriteProvider;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.Top;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT.class */
public class SingleStoreIOSchemaTransformIT {
    private static final String DATABASE_NAME = "SingleStoreIOIT";
    private static int numberOfRows;
    private static String tableName;
    private static String serverName;
    private static String username;
    private static String password;
    private static Integer port;
    private static SingleStoreIO.DataSourceConfiguration dataSourceConfiguration;

    @Rule
    public TestPipeline pipelineWrite = TestPipeline.create();

    @Rule
    public TestPipeline pipelineRead = TestPipeline.create();

    @Rule
    public TestPipeline pipelineReadWithPartitions = TestPipeline.create();

    @BeforeClass
    public static void setup() {
        SingleStoreIOTestPipelineOptions singleStoreIOTestPipelineOptions;
        try {
            singleStoreIOTestPipelineOptions = (SingleStoreIOTestPipelineOptions) IOITHelper.readIOTestPipelineOptions(SingleStoreIOTestPipelineOptions.class);
        } catch (IllegalArgumentException e) {
            singleStoreIOTestPipelineOptions = null;
        }
        Assume.assumeNotNull(new Object[]{singleStoreIOTestPipelineOptions});
        numberOfRows = singleStoreIOTestPipelineOptions.getNumberOfRecords().intValue();
        serverName = singleStoreIOTestPipelineOptions.getSingleStoreServerName();
        username = singleStoreIOTestPipelineOptions.getSingleStoreUsername();
        password = singleStoreIOTestPipelineOptions.getSingleStorePassword();
        port = singleStoreIOTestPipelineOptions.getSingleStorePort();
        tableName = DatabaseTestHelper.getTestTableName("IT");
        dataSourceConfiguration = SingleStoreIO.DataSourceConfiguration.create(serverName + ":" + port).withDatabase(DATABASE_NAME).withPassword(password).withUsername(username);
    }

    @Test
    @Category({NeedsRunner.class})
    public void testWriteThenRead() throws Exception {
        TestHelper.createDatabaseIfNotExists(serverName, port, username, password, DATABASE_NAME);
        SingleStoreDataSource singleStoreDataSource = new SingleStoreDataSource(String.format("jdbc:singlestore://%s:%d/%s?user=%s&password=%s&allowLocalInfile=TRUE", serverName, port, DATABASE_NAME, username, password));
        DatabaseTestHelper.createTable(singleStoreDataSource, tableName);
        try {
            Assert.assertEquals(PipelineResult.State.DONE, runWrite().waitUntilFinish());
            Assert.assertEquals(PipelineResult.State.DONE, runRead().waitUntilFinish());
            Assert.assertEquals(PipelineResult.State.DONE, runReadWithPartitions().waitUntilFinish());
            DatabaseTestHelper.deleteTable(singleStoreDataSource, tableName);
        } catch (Throwable th) {
            DatabaseTestHelper.deleteTable(singleStoreDataSource, tableName);
            throw th;
        }
    }

    private PipelineResult runWrite() {
        SingleStoreSchemaTransformWriteProvider singleStoreSchemaTransformWriteProvider = new SingleStoreSchemaTransformWriteProvider();
        PTransform buildTransform = singleStoreSchemaTransformWriteProvider.from(SingleStoreSchemaTransformWriteConfiguration.builder().setDataSourceConfiguration(dataSourceConfiguration).setTable(tableName).setBatchSize(100).build().toBeamRow()).buildTransform();
        Schema.Builder builder = new Schema.Builder();
        builder.addField("id", Schema.FieldType.INT32);
        builder.addField("name", Schema.FieldType.STRING);
        Schema build = builder.build();
        PCollectionRowTuple of = PCollectionRowTuple.of("INPUT", this.pipelineWrite.apply(GenerateSequence.from(0L).to(numberOfRows)).apply(ParDo.of(new TestRow.DeterministicallyConstructTestRowFn())).apply("Convert TestRows to Rows", MapElements.into(TypeDescriptor.of(Row.class)).via(testRow -> {
            Row.Builder withSchema = Row.withSchema(build);
            withSchema.addValue(testRow.id());
            withSchema.addValue(testRow.name());
            return withSchema.build();
        })).setRowSchema(build));
        String str = (String) singleStoreSchemaTransformWriteProvider.outputCollectionNames().get(0);
        PCollectionRowTuple apply = of.apply(buildTransform);
        Assert.assertTrue(apply.has(str));
        PAssert.thatSingleton(apply.get(str).apply("Convert Rows to Integers", MapElements.into(TypeDescriptor.of(Integer.class)).via(row -> {
            return row.getInt32(0);
        })).apply("Sum All", Sum.integersGlobally())).isEqualTo(Integer.valueOf(numberOfRows));
        return this.pipelineWrite.run();
    }

    private PipelineResult runRead() {
        SingleStoreSchemaTransformReadProvider singleStoreSchemaTransformReadProvider = new SingleStoreSchemaTransformReadProvider();
        PTransform buildTransform = singleStoreSchemaTransformReadProvider.from(SingleStoreSchemaTransformReadConfiguration.builder().setDataSourceConfiguration(dataSourceConfiguration).setTable(tableName).setOutputParallelization(true).build().toBeamRow()).buildTransform();
        PCollectionRowTuple empty = PCollectionRowTuple.empty(this.pipelineRead);
        String str = (String) singleStoreSchemaTransformReadProvider.outputCollectionNames().get(0);
        PCollectionRowTuple apply = empty.apply(buildTransform);
        Assert.assertTrue(apply.has(str));
        testReadResult((PCollection) apply.get(str).apply(MapElements.into(TypeDescriptor.of(TestRow.class)).via(row -> {
            return TestRow.create(row.getInt32(0), row.getString(1));
        })));
        return this.pipelineRead.run();
    }

    private PipelineResult runReadWithPartitions() {
        SingleStoreSchemaTransformReadProvider singleStoreSchemaTransformReadProvider = new SingleStoreSchemaTransformReadProvider();
        PTransform buildTransform = singleStoreSchemaTransformReadProvider.from(SingleStoreSchemaTransformReadConfiguration.builder().setDataSourceConfiguration(dataSourceConfiguration).setTable(tableName).setWithPartitions(true).build().toBeamRow()).buildTransform();
        PCollectionRowTuple empty = PCollectionRowTuple.empty(this.pipelineReadWithPartitions);
        String str = (String) singleStoreSchemaTransformReadProvider.outputCollectionNames().get(0);
        PCollectionRowTuple apply = empty.apply(buildTransform);
        Assert.assertTrue(apply.has(str));
        testReadResult((PCollection) apply.get(str).apply(MapElements.into(TypeDescriptor.of(TestRow.class)).via(row -> {
            return TestRow.create(row.getInt32(0), row.getString(1));
        })));
        return this.pipelineReadWithPartitions.run();
    }

    private void testReadResult(PCollection<TestRow> pCollection) {
        PAssert.thatSingleton(pCollection.apply("Count All", Count.globally())).isEqualTo(Long.valueOf(numberOfRows));
        PAssert.that(pCollection.apply(ParDo.of(new TestRow.SelectNameFn())).apply("Hash row contents", Combine.globally(new HashingFn()).withoutDefaults())).containsInAnyOrder(new String[]{TestRow.getExpectedHashForRowCount(numberOfRows)});
        PAssert.thatSingletonIterable(pCollection.apply(Top.smallest(500))).containsInAnyOrder(TestRow.getExpectedValues(0, 500));
        PAssert.thatSingletonIterable(pCollection.apply(Top.largest(500))).containsInAnyOrder(TestRow.getExpectedValues(numberOfRows - 500, numberOfRows));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1693966082:
                if (implMethodName.equals("lambda$runWrite$a17f48fa$1")) {
                    z = true;
                    break;
                }
                break;
            case -131859149:
                if (implMethodName.equals("lambda$runRead$2987aefb$1")) {
                    z = false;
                    break;
                }
                break;
            case 484237444:
                if (implMethodName.equals("lambda$runReadWithPartitions$2987aefb$1")) {
                    z = 2;
                    break;
                }
                break;
            case 2071641302:
                if (implMethodName.equals("lambda$runWrite$8288314$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Lorg/apache/beam/sdk/io/common/TestRow;")) {
                    return row -> {
                        return TestRow.create(row.getInt32(0), row.getString(1));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/schemas/Schema;Lorg/apache/beam/sdk/io/common/TestRow;)Lorg/apache/beam/sdk/values/Row;")) {
                    Schema schema = (Schema) serializedLambda.getCapturedArg(0);
                    return testRow -> {
                        Row.Builder withSchema = Row.withSchema(schema);
                        withSchema.addValue(testRow.id());
                        withSchema.addValue(testRow.name());
                        return withSchema.build();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Lorg/apache/beam/sdk/io/common/TestRow;")) {
                    return row2 -> {
                        return TestRow.create(row2.getInt32(0), row2.getString(1));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/io/singlestore/SingleStoreIOSchemaTransformIT") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/beam/sdk/values/Row;)Ljava/lang/Integer;")) {
                    return row3 -> {
                        return row3.getInt32(0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
