/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql;

import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.bigquery.BigQueryTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
import org.apache.beam.sdk.io.gcp.bigquery.TestBigQuery;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class PubsubToBigqueryIT
implements Serializable {
    private static final Schema SOURCE_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT64).addNullableField("name", Schema.FieldType.STRING).build();
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public transient TestPubsub pubsub = TestPubsub.create();
    @Rule
    public transient TestBigQuery bigQuery = TestBigQuery.create((Schema)SOURCE_SCHEMA);

    @Test
    public void testSimpleInsert() throws Exception {
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubJsonTableProvider(), new BigQueryTableProvider()});
        String createTableString = "CREATE EXTERNAL TABLE pubsub_topic (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.pubsub.topicPath() + "' \nTBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'";
        sqlEnv.executeDdl(createTableString);
        String createTableStatement = "CREATE EXTERNAL TABLE bq_table( \n   id BIGINT, \n   name VARCHAR \n ) \nTYPE 'bigquery' \nLOCATION '" + this.bigQuery.tableSpec() + "'";
        sqlEnv.executeDdl(createTableStatement);
        String insertStatement = "INSERT INTO bq_table \nSELECT \n  pubsub_topic.payload.id, \n  pubsub_topic.payload.name \nFROM pubsub_topic";
        BeamSqlRelUtils.toPCollection((Pipeline)this.pipeline, (BeamRelNode)sqlEnv.parseQuery(insertStatement));
        this.pipeline.run();
        ImmutableList messages = ImmutableList.of((Object)this.message(this.ts(1L), 3, "foo"), (Object)this.message(this.ts(2L), 5, "bar"), (Object)this.message(this.ts(3L), 7, "baz"));
        this.pubsub.publish((List)messages);
        this.bigQuery.assertThatAllRows(SOURCE_SCHEMA).eventually(Matchers.containsInAnyOrder((Object[])new Row[]{this.row(SOURCE_SCHEMA, 3L, "foo"), this.row(SOURCE_SCHEMA, 5L, "bar"), this.row(SOURCE_SCHEMA, 7L, "baz")})).pollFor(Duration.standardMinutes((long)5L));
    }

    private Row row(Schema schema, Object ... values) {
        return Row.withSchema((Schema)schema).addValues(values).build();
    }

    private PubsubMessage message(Instant timestamp, int id, String name) {
        return this.message(timestamp, this.jsonString(id, name));
    }

    private PubsubMessage message(Instant timestamp, String jsonPayload) {
        return new PubsubMessage(jsonPayload.getBytes(StandardCharsets.UTF_8), (Map)ImmutableMap.of((Object)"ts", (Object)String.valueOf(timestamp.getMillis())));
    }

    private String jsonString(int id, String name) {
        return "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }";
    }

    private Instant ts(long millis) {
        return new Instant(millis);
    }
}

