package org.apache.beam.sdk.extensions.sql;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/PubsubToBigqueryIT.class */
public class PubsubToBigqueryIT implements Serializable {
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT64).addNullableField("name", Schema.FieldType.STRING).build();

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public transient TestPubsub pubsub = TestPubsub.create();

    @Rule
    public transient TestBigQuery bigQuery = TestBigQuery.create(SOURCE_SCHEMA);

    @Test
    public void testSimpleInsert() throws Exception {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubJsonTableProvider(), new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE pubsub_topic (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.pubsub.topicPath() + "' \nTBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'");
        inMemory.executeDdl("CREATE EXTERNAL TABLE bq_table( \n   id BIGINT, \n   name VARCHAR \n ) \nTYPE 'bigquery' \nLOCATION '" + this.bigQuery.tableSpec() + "'");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO bq_table \nSELECT \n  pubsub_topic.payload.id, \n  pubsub_topic.payload.name \nFROM pubsub_topic"));
        this.pipeline.run();
        this.pubsub.publish(ImmutableList.of(message(ts(1L), 3, "foo"), message(ts(2L), 5, "bar"), message(ts(3L), 7, "baz")));
        this.bigQuery.assertThatAllRows(SOURCE_SCHEMA).eventually(Matchers.containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA, 3L, "foo"), row(SOURCE_SCHEMA, 5L, "bar"), row(SOURCE_SCHEMA, 7L, "baz")})).pollFor(Duration.standardMinutes(5L));
    }

    @Test
    public void testSimpleInsertFlat() throws Exception {
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubJsonTableProvider(), new BigQueryTableProvider()});
        inMemory.executeDdl("CREATE EXTERNAL TABLE pubsub_topic (\nevent_timestamp TIMESTAMP, \nid INTEGER, \nname VARCHAR \n) \nTYPE 'pubsub' \nLOCATION '" + this.pubsub.topicPath() + "' \nTBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'");
        inMemory.executeDdl("CREATE EXTERNAL TABLE bq_table( \n   id BIGINT, \n   name VARCHAR \n ) \nTYPE 'bigquery' \nLOCATION '" + this.bigQuery.tableSpec() + "'");
        BeamSqlRelUtils.toPCollection(this.pipeline, inMemory.parseQuery("INSERT INTO bq_table \nSELECT \n  id, \n  name \nFROM pubsub_topic"));
        this.pipeline.run();
        this.pubsub.publish(ImmutableList.of(message(ts(1L), 3, "foo"), message(ts(2L), 5, "bar"), message(ts(3L), 7, "baz")));
        this.bigQuery.assertThatAllRows(SOURCE_SCHEMA).eventually(Matchers.containsInAnyOrder(new Row[]{row(SOURCE_SCHEMA, 3L, "foo"), row(SOURCE_SCHEMA, 5L, "bar"), row(SOURCE_SCHEMA, 7L, "baz")})).pollFor(Duration.standardMinutes(5L));
    }

    private Row row(Schema schema, Object... objArr) {
        return Row.withSchema(schema).addValues(objArr).build();
    }

    private PubsubMessage message(Instant instant, int i, String str) {
        return message(instant, jsonString(i, str));
    }

    private PubsubMessage message(Instant instant, String str) {
        return new PubsubMessage(str.getBytes(StandardCharsets.UTF_8), ImmutableMap.of("ts", String.valueOf(instant.getMillis())));
    }

    private String jsonString(int i, String str) {
        return "{ \"id\" : " + i + ", \"name\" : \"" + str + "\" }";
    }

    private Instant ts(long j) {
        return new Instant(j);
    }
}
