package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;

import com.google.datastore.v1.Key;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.PropertyFilter;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.client.DatastoreHelper;
import java.util.UUID;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.datastore.EntityToRow;
import org.apache.beam.sdk.io.gcp.datastore.RowToEntity;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.avatica.util.ByteString;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.class */
public class DataStoreReadWriteIT {
    private static final BigQueryOptions options = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class);
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("__key__", CalciteUtils.VARBINARY).addNullableField("content", Schema.FieldType.STRING).build();
    private static final Schema SOURCE_SCHEMA_WITHOUT_KEY = Schema.builder().addNullableField("content", Schema.FieldType.STRING).build();
    private static final String KIND = "writereadtest";
    private static final String KIND_ALL_TYPES = "writereadalltypestest";

    @Rule
    public final TestPipeline writePipeline = TestPipeline.create();

    @Rule
    public transient TestPipeline readPipeline = TestPipeline.create();

    @Test
    public void testDataStoreV1SqlWriteRead() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new DataStoreV1TableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   `__key__` VARBINARY, \n   `content` VARCHAR \n) \nTYPE 'datastoreV1' \nLOCATION '" + options.getProject() + "/" + KIND + "'");
        BeamSqlRelUtils.toPCollection(this.writePipeline, inMemory.parseQuery("INSERT INTO TEST VALUES ( \n" + keyToSqlByteString(DatastoreHelper.makeKey(new Object[]{DatastoreHelper.makeKey(new Object[]{KIND, UUID.randomUUID().toString()}).build(), KIND, UUID.randomUUID().toString()}).build()) + ", \n'2000' \n)"));
        this.writePipeline.run().waitUntilFinish();
        MatcherAssert.assertThat(BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("SELECT * FROM TEST")).getSchema(), Matchers.equalTo(SOURCE_SCHEMA));
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testDataStoreV1SqlWriteRead_withoutKey() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new DataStoreV1TableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   `content` VARCHAR \n) \nTYPE 'datastoreV1' \nLOCATION '" + options.getProject() + "/" + KIND + "'");
        BeamSqlRelUtils.toPCollection(this.writePipeline, inMemory.parseQuery("INSERT INTO TEST VALUES ( '3000' )"));
        this.writePipeline.run().waitUntilFinish();
        MatcherAssert.assertThat(BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("SELECT * FROM TEST")).getSchema(), Matchers.equalTo(SOURCE_SCHEMA_WITHOUT_KEY));
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    @Test
    public void testWriteRead_viaCoreBeamIO() {
        String project = options.getProject();
        Key build = DatastoreHelper.makeKey(new Object[]{DatastoreHelper.makeKey(new Object[]{KIND, UUID.randomUUID().toString()}).build(), KIND, UUID.randomUUID().toString()}).setPartitionId(PartitionId.newBuilder().setProjectId(project).build()).build();
        Row build2 = Row.withSchema(SOURCE_SCHEMA).addValues(new Object[]{build.toByteArray(), "4000"}).build();
        this.writePipeline.apply(Create.of(build2, new Row[0]).withRowSchema(SOURCE_SCHEMA)).apply(RowToEntity.create("__key__", KIND)).apply(DatastoreIO.v1().write().withProjectId(project));
        this.writePipeline.run().waitUntilFinish();
        Query.Builder newBuilder = Query.newBuilder();
        newBuilder.addKindBuilder().setName(KIND);
        newBuilder.setFilter(DatastoreHelper.makeFilter("__key__", PropertyFilter.Operator.EQUAL, DatastoreHelper.makeValue(build)));
        PAssert.that(this.readPipeline.apply(DatastoreIO.v1().read().withProjectId(project).withQuery(newBuilder.build())).apply(EntityToRow.create(SOURCE_SCHEMA, "__key__"))).containsInAnyOrder(new Row[]{build2});
        this.readPipeline.run().waitUntilFinish();
    }

    @Test
    public void testReadAllSupportedTypes() {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new DataStoreV1TableProvider()});
        String project = options.getProject();
        Schema build = Schema.builder().addNullableField("__key__", CalciteUtils.VARBINARY).addNullableField("boolean", Schema.FieldType.BOOLEAN).addNullableField("datetime", Schema.FieldType.DATETIME).addNullableField("floatingnumber", Schema.FieldType.DOUBLE).addNullableField("integer", Schema.FieldType.INT64).addNullableField("primitivearray", Schema.FieldType.array(Schema.FieldType.STRING)).addNullableField("string", Schema.FieldType.STRING).addNullableField("text", Schema.FieldType.STRING).build();
        inMemory.executeDdl("CREATE EXTERNAL TABLE TEST( \n   `__key__` VARBINARY, \n   `boolean` BOOLEAN, \n   `datetime` TIMESTAMP, \n   `floatingnumber` DOUBLE, \n   `integer` BIGINT, \n   `primitivearray` ARRAY<VARCHAR>, \n   `string` VARCHAR, \n   `text` VARCHAR) \nTYPE 'datastoreV1' \nLOCATION '" + project + "/" + KIND_ALL_TYPES + "'");
        MatcherAssert.assertThat(BeamSqlRelUtils.toPCollection(this.readPipeline, inMemory.parseQuery("SELECT * FROM TEST")).getSchema(), Matchers.equalTo(build));
        MatcherAssert.assertThat(this.readPipeline.run().waitUntilFinish(Duration.standardMinutes(5L)), Matchers.equalTo(PipelineResult.State.DONE));
    }

    private static String keyToSqlByteString(Key key) {
        return "X'" + ByteString.toString(key.toByteArray(), 16) + "'";
    }
}
