/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;

import com.google.datastore.v1.Key;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.PropertyFilter;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.Value;
import com.google.datastore.v1.client.DatastoreHelper;
import java.util.UUID;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1TableProvider;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
import org.apache.beam.sdk.io.gcp.datastore.EntityToRow;
import org.apache.beam.sdk.io.gcp.datastore.RowToEntity;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.avatica.util.ByteString;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class DataStoreReadWriteIT {
    private static final BigQueryOptions options = (BigQueryOptions)TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("__key__", CalciteUtils.VARBINARY).addNullableField("content", Schema.FieldType.STRING).build();
    private static final Schema SOURCE_SCHEMA_WITHOUT_KEY = Schema.builder().addNullableField("content", Schema.FieldType.STRING).build();
    private static final String KIND = "writereadtest";
    private static final String KIND_ALL_TYPES = "writereadalltypestest";
    @Rule
    public final TestPipeline writePipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();

    @Test
    public void testDataStoreV1SqlWriteRead() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new DataStoreV1TableProvider()});
        String projectId = options.getProject();
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   `__key__` VARBINARY, \n   `content` VARCHAR \n) \nTYPE 'datastoreV1' \nLOCATION '" + projectId + "/" + KIND + "'";
        sqlEnv.executeDdl(createTableStatement);
        Key ancestor = DatastoreHelper.makeKey((Object[])new Object[]{KIND, UUID.randomUUID().toString()}).build();
        Key itemKey = DatastoreHelper.makeKey((Object[])new Object[]{ancestor, KIND, UUID.randomUUID().toString()}).build();
        String insertStatement = "INSERT INTO TEST VALUES ( \n" + DataStoreReadWriteIT.keyToSqlByteString(itemKey) + ", \n'2000' \n)";
        BeamSqlRelUtils.toPCollection((Pipeline)this.writePipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.writePipeline.run().waitUntilFinish();
        String selectTableStatement = "SELECT * FROM TEST";
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery(selectTableStatement));
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)Matchers.equalTo((Object)SOURCE_SCHEMA));
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testDataStoreV1SqlWriteRead_withoutKey() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new DataStoreV1TableProvider()});
        String projectId = options.getProject();
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   `content` VARCHAR \n) \nTYPE 'datastoreV1' \nLOCATION '" + projectId + "/" + KIND + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO TEST VALUES ( '3000' )";
        BeamSqlRelUtils.toPCollection((Pipeline)this.writePipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.writePipeline.run().waitUntilFinish();
        String selectTableStatement = "SELECT * FROM TEST";
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery(selectTableStatement));
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)Matchers.equalTo((Object)SOURCE_SCHEMA_WITHOUT_KEY));
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    @Test
    public void testWriteRead_viaCoreBeamIO() {
        String projectId = options.getProject();
        Key ancestor = DatastoreHelper.makeKey((Object[])new Object[]{KIND, UUID.randomUUID().toString()}).build();
        Key itemKey = DatastoreHelper.makeKey((Object[])new Object[]{ancestor, KIND, UUID.randomUUID().toString()}).setPartitionId(PartitionId.newBuilder().setProjectId(projectId).build()).build();
        Row testWriteRow = Row.withSchema((Schema)SOURCE_SCHEMA).addValues(new Object[]{itemKey.toByteArray(), "4000"}).build();
        ((PCollection)((PCollection)this.writePipeline.apply((PTransform)Create.of((Object)testWriteRow, (Object[])new Row[0]).withRowSchema(SOURCE_SCHEMA))).apply((PTransform)RowToEntity.create((String)"__key__", (String)KIND))).apply((PTransform)DatastoreIO.v1().write().withProjectId(projectId));
        this.writePipeline.run().waitUntilFinish();
        Query.Builder query = Query.newBuilder();
        query.addKindBuilder().setName(KIND);
        query.setFilter(DatastoreHelper.makeFilter((String)"__key__", (PropertyFilter.Operator)PropertyFilter.Operator.EQUAL, (Value.Builder)DatastoreHelper.makeValue((Key)itemKey)));
        DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(projectId).withQuery(query.build());
        PCollection rowsRead = (PCollection)((PCollection)this.readPipeline.apply((PTransform)read)).apply((PTransform)EntityToRow.create((Schema)SOURCE_SCHEMA, (String)"__key__"));
        PAssert.that((PCollection)rowsRead).containsInAnyOrder((Object[])new Row[]{testWriteRow});
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadAllSupportedTypes() {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new DataStoreV1TableProvider()});
        String projectId = options.getProject();
        Schema expectedSchema = Schema.builder().addNullableField("__key__", CalciteUtils.VARBINARY).addNullableField("boolean", Schema.FieldType.BOOLEAN).addNullableField("datetime", Schema.FieldType.DATETIME).addNullableField("floatingnumber", Schema.FieldType.DOUBLE).addNullableField("integer", Schema.FieldType.INT64).addNullableField("primitivearray", Schema.FieldType.array((Schema.FieldType)Schema.FieldType.STRING)).addNullableField("string", Schema.FieldType.STRING).addNullableField("text", Schema.FieldType.STRING).build();
        String createTableStatement = "CREATE EXTERNAL TABLE TEST( \n   `__key__` VARBINARY, \n   `boolean` BOOLEAN, \n   `datetime` TIMESTAMP, \n   `floatingnumber` DOUBLE, \n   `integer` BIGINT, \n   `primitivearray` ARRAY<VARCHAR>, \n   `string` VARCHAR, \n   `text` VARCHAR) \nTYPE 'datastoreV1' \nLOCATION '" + projectId + "/" + KIND_ALL_TYPES + "'";
        sqlEnv.executeDdl(createTableStatement);
        String selectTableStatement = "SELECT * FROM TEST";
        PCollection output = BeamSqlRelUtils.toPCollection((Pipeline)this.readPipeline, (BeamRelNode)sqlEnv.parseQuery(selectTableStatement));
        MatcherAssert.assertThat((Object)output.getSchema(), (Matcher)Matchers.equalTo((Object)expectedSchema));
        PipelineResult.State state = this.readPipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        MatcherAssert.assertThat((Object)state, (Matcher)Matchers.equalTo((Object)PipelineResult.State.DONE));
    }

    private static String keyToSqlByteString(Key key) {
        return "X'" + ByteString.toString((byte[])key.toByteArray(), (int)16) + "'";
    }
}

