/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.protobuf.PayloadMessages;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.testing.JsonMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalciteConnection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class PubsubTableProviderIT
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubTableProviderIT.class);
    private static final Schema PAYLOAD_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
    @Rule
    public transient TestPubsub eventsTopic = TestPubsub.create();
    @Rule
    public transient TestPubsub filteredEventsTopic = TestPubsub.create();
    @Rule
    public transient TestPubsub dlqTopic = TestPubsub.create();
    @Rule
    public transient TestPubsubSignal resultSignal = TestPubsubSignal.create();
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    @Rule
    public transient TestPipeline filterPipeline = TestPipeline.create();
    private final SchemaIOTableProviderWrapper tableProvider = new PubsubTableProvider();
    private final Duration timeout = Duration.standardMinutes((long)10L);
    @Parameterized.Parameter
    public PubsubObjectProvider objectsProvider;
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModules((Iterable)ObjectMapper.findModules((ClassLoader)ReflectHelpers.findClassLoader()));

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList({new PubsubJsonObjectProvider()}, {new PubsubAvroObjectProvider()}, {new PubsubProtoObjectProvider()});
    }

    @Test
    public void testSQLSelectsPayloadContent() throws Exception {
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES '{ %s\"protoClass\" : \"%s\", \"timestampAttributeKey\" : \"ts\" }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), this.payloadFormatParam(), PayloadMessages.SimpleMessage.class.getName());
        String queryString = "SELECT message.payload.id, message.payload.name from message";
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        PCollection<Row> queryOutput = this.query(sqlEnv, this.pipeline, queryString);
        queryOutput.apply("waitForSuccess", this.resultSignal.signalSuccessWhen((Coder)SchemaCoder.of((Schema)PAYLOAD_SCHEMA), (SerializableFunction & Serializable)observedRows -> observedRows.equals(ImmutableSet.of((Object)this.row(PAYLOAD_SCHEMA, 3, "foo"), (Object)this.row(PAYLOAD_SCHEMA, 5, "bar"), (Object)this.row(PAYLOAD_SCHEMA, 7, "baz")))));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)5L));
        this.eventsTopic.publish((List)org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList.of((Object)this.objectsProvider.messageIdName(this.ts(1L), 3, "foo"), (Object)this.objectsProvider.messageIdName(this.ts(2L), 5, "bar"), (Object)this.objectsProvider.messageIdName(this.ts(3L), 7, "baz")));
        this.resultSignal.waitForSuccess(this.timeout);
    }

    @Ignore(value="https://issues.apache.org/jira/browse/BEAM-12320")
    @Test
    public void testSQLSelectsArrayAttributes() throws Exception {
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes ARRAY<ROW<key VARCHAR, `value` VARCHAR>>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES '{ %s\"protoClass\" : \"%s\", \"timestampAttributeKey\" : \"ts\" }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), this.payloadFormatParam(), PayloadMessages.SimpleMessage.class.getName());
        String queryString = "SELECT message.payload.id, attributes[1].key AS a1, attributes[2].key AS a2 FROM message";
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        PCollection<Row> queryOutput = this.query(sqlEnv, this.pipeline, queryString);
        queryOutput.apply("waitForSuccess", this.resultSignal.signalSuccessWhen((Coder)SchemaCoder.of((Schema)PAYLOAD_SCHEMA), (SerializableFunction & Serializable)observedRows -> {
            HashMap<Integer, String> entries = new HashMap<Integer, String>();
            for (Row row : observedRows) {
                if ("ts".equals(row.getString("a1"))) {
                    entries.put(row.getInt32("id"), row.getString("a2"));
                    continue;
                }
                entries.put(row.getInt32("id"), row.getString("a1"));
            }
            LOG.info("Entries: {}", entries);
            return entries.equals(ImmutableMap.of((Object)3, (Object)"foo", (Object)5, (Object)"bar", (Object)7, (Object)"baz"));
        }));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)5L));
        this.eventsTopic.publish((List)org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList.of((Object)this.objectsProvider.messageIdName(this.ts(1L), 3, "foo"), (Object)this.objectsProvider.messageIdName(this.ts(2L), 5, "bar"), (Object)this.objectsProvider.messageIdName(this.ts(3L), 7, "baz")));
        this.resultSignal.waitForSuccess(this.timeout);
    }

    @Test
    public void testSQLWithBytePayload() throws Exception {
        org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList messages = org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList.of((Object)this.objectsProvider.messageIdName(this.ts(1L), 3, "foo"), (Object)this.objectsProvider.messageIdName(this.ts(2L), 5, "bar"), (Object)this.objectsProvider.messageIdName(this.ts(3L), 7, "baz"));
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload VARBINARY \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES '{ \"protoClass\" : \"%s\", \"timestampAttributeKey\" : \"ts\" }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), PayloadMessages.SimpleMessage.class.getName());
        String queryString = "SELECT message.payload AS some_bytes FROM message";
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        PCollection<Row> queryOutput = this.query(sqlEnv, this.pipeline, queryString);
        Schema justBytesSchema = Schema.builder().addField("some_bytes", Schema.FieldType.BYTES.withNullable(true)).build();
        Row expectedRow0 = this.row(justBytesSchema, new Object[]{((PubsubMessage)messages.get(0)).getPayload()});
        Row expectedRow1 = this.row(justBytesSchema, new Object[]{((PubsubMessage)messages.get(1)).getPayload()});
        Row expectedRow2 = this.row(justBytesSchema, new Object[]{((PubsubMessage)messages.get(2)).getPayload()});
        ImmutableSet expected = ImmutableSet.of((Object)expectedRow0, (Object)expectedRow1, (Object)expectedRow2);
        queryOutput.apply("waitForSuccess", this.resultSignal.signalSuccessWhen((Coder)SchemaCoder.of((Schema)justBytesSchema), arg_0 -> PubsubTableProviderIT.lambda$testSQLWithBytePayload$90b56436$1((Set)expected, arg_0)));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)5L));
        this.eventsTopic.publish((List)messages);
        this.resultSignal.waitForSuccess(this.timeout);
    }

    @Test
    public void testUsesDlq() throws Exception {
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"%s\",        \"protoClass\" : \"%s\"      }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), this.payloadFormatParam(), this.dlqTopic.topicPath(), PayloadMessages.SimpleMessage.class.getName());
        String queryString = "SELECT message.payload.id, message.payload.name from message";
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        PCollection<Row> queryOutput = this.query(sqlEnv, this.pipeline, queryString);
        queryOutput.apply("waitForSuccess", this.resultSignal.signalSuccessWhen((Coder)SchemaCoder.of((Schema)PAYLOAD_SCHEMA), (SerializableFunction & Serializable)observedRows -> observedRows.equals(ImmutableSet.of((Object)this.row(PAYLOAD_SCHEMA, 3, "foo"), (Object)this.row(PAYLOAD_SCHEMA, 5, "bar"), (Object)this.row(PAYLOAD_SCHEMA, 7, "baz")))));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)5L));
        this.eventsTopic.publish((List)org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList.of((Object)this.objectsProvider.messageIdName(this.ts(1L), 3, "foo"), (Object)this.objectsProvider.messageIdName(this.ts(2L), 5, "bar"), (Object)this.objectsProvider.messageIdName(this.ts(3L), 7, "baz"), (Object)this.messagePayload(this.ts(4L), "{ - }", (Map<String, String>)ImmutableMap.of()), (Object)this.messagePayload(this.ts(5L), "{ + }", (Map<String, String>)ImmutableMap.of())));
        this.resultSignal.waitForSuccess(this.timeout);
        this.dlqTopic.assertThatTopicEventuallyReceives(new Matcher[]{this.matcherPayload(this.ts(4L), "{ - }"), this.matcherPayload(this.ts(5L), "{ + }")}).waitForUpTo(Duration.standardSeconds((long)40L));
    }

    @Test
    public void testSQLLimit() throws Exception {
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"%s\",        \"protoClass\" : \"%s\"      }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), this.payloadFormatParam(), this.dlqTopic.topicPath(), PayloadMessages.SimpleMessage.class.getName());
        org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList messages = org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList.of((Object)this.objectsProvider.messageIdName(this.ts(1L), 3, "foo"), (Object)this.objectsProvider.messageIdName(this.ts(2L), 5, "bar"), (Object)this.objectsProvider.messageIdName(this.ts(3L), 7, "baz"), (Object)this.objectsProvider.messageIdName(this.ts(4L), 9, "ba2"), (Object)this.objectsProvider.messageIdName(this.ts(5L), 10, "ba3"), (Object)this.objectsProvider.messageIdName(this.ts(6L), 13, "ba4"), (Object)this.objectsProvider.messageIdName(this.ts(7L), 15, "ba5"));
        CalciteConnection connection = this.connect(this.pipeline.getOptions(), new TableProvider[]{new PubsubTableProvider()});
        Statement statement = connection.createStatement();
        statement.execute(createTableString);
        ExecutorService pool = Executors.newFixedThreadPool(1);
        Future<Object> queryResult = pool.submit(() -> {
            ResultSet resultSet = statement.executeQuery("SELECT message.payload.id FROM message LIMIT 3");
            ImmutableList.Builder result = org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList.builder();
            while (resultSet.next()) {
                result.add((Object)resultSet.getString(1));
            }
            return result.build();
        });
        try {
            this.eventsTopic.assertSubscriptionEventuallyCreated(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)5L));
        }
        catch (AssertionError assertionError) {
            try {
                queryResult.get(0L, TimeUnit.SECONDS);
            }
            catch (TimeoutException timeoutException) {
            }
            catch (ExecutionException e) {
                throw new AssertionError("Exception occurred in statement.executeQuery thread", e);
            }
            throw assertionError;
        }
        this.eventsTopic.publish((List)messages);
        MatcherAssert.assertThat((Object)((List)queryResult.get(2L, TimeUnit.MINUTES)).size(), (Matcher)Matchers.equalTo((Object)3));
        pool.shutdown();
    }

    @Test
    public void testSQLSelectsPayloadContentFlat() throws Exception {
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nid INTEGER, \nname VARCHAR \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"protoClass\" : \"%s\",        \"timestampAttributeKey\" : \"ts\"      }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), this.payloadFormatParam(), PayloadMessages.SimpleMessage.class.getName());
        String queryString = "SELECT message.id, message.name from message";
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        PCollection<Row> queryOutput = this.query(sqlEnv, this.pipeline, queryString);
        queryOutput.apply("waitForSuccess", this.resultSignal.signalSuccessWhen((Coder)SchemaCoder.of((Schema)PAYLOAD_SCHEMA), (SerializableFunction & Serializable)observedRows -> observedRows.equals(ImmutableSet.of((Object)this.row(PAYLOAD_SCHEMA, 3, "foo"), (Object)this.row(PAYLOAD_SCHEMA, 5, "bar"), (Object)this.row(PAYLOAD_SCHEMA, 7, "baz")))));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)5L));
        this.eventsTopic.publish((List)org.apache.beam.vendor.calcite.v1_28_0.com.google.common.collect.ImmutableList.of((Object)this.objectsProvider.messageIdName(this.ts(1L), 3, "foo"), (Object)this.objectsProvider.messageIdName(this.ts(2L), 5, "bar"), (Object)this.objectsProvider.messageIdName(this.ts(3L), 7, "baz")));
        this.resultSignal.waitForSuccess(this.timeout);
    }

    @Test
    public void testSQLInsertRowsToPubsubFlat() throws Exception {
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nname VARCHAR, \nheight INTEGER, \nknows_javascript BOOLEAN \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"protoClass\" : \"%s\",        \"deadLetterQueue\" : \"%s\"     }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), this.payloadFormatParam(), PayloadMessages.NameHeightKnowsJSMessage.class.getName(), this.dlqTopic.topicPath());
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        String queryString = "INSERT INTO message (name, height, knows_javascript) \nVALUES \n('person1', 80, TRUE), \n('person2', 70, FALSE)";
        this.query(sqlEnv, this.pipeline, queryString);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        this.eventsTopic.assertThatTopicEventuallyReceives(new Matcher[]{this.objectsProvider.matcherNameHeightKnowsJS("person1", 80, true), this.objectsProvider.matcherNameHeightKnowsJS("person2", 70, false)}).waitForUpTo(Duration.standardSeconds((long)40L));
    }

    @Test
    public void testSQLInsertRowsToPubsubWithTimestampAttributeFlat() throws Exception {
        String createTableString = String.format("CREATE EXTERNAL TABLE message (\n  event_timestamp TIMESTAMP, \n  name VARCHAR, \n  height INTEGER, \n  knows_javascript BOOLEAN \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES   '{      %s      \"protoClass\" : \"%s\",      \"deadLetterQueue\" : \"%s\",     \"timestampAttributeKey\" : \"ts\"   }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), this.payloadFormatParam(), PayloadMessages.NameHeightKnowsJSMessage.class.getName(), this.dlqTopic.topicPath());
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        String queryString = "INSERT INTO message VALUES (TIMESTAMP '1970-01-01 00:00:00.002', 'person2', 70, FALSE)";
        this.query(sqlEnv, this.pipeline, queryString);
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        this.eventsTopic.assertThatTopicEventuallyReceives(new Matcher[]{this.matcherTsNameHeightKnowsJS(this.ts(2L), "person2", 70, false)}).waitForUpTo(Duration.standardSeconds((long)40L));
    }

    @Test
    public void testSQLReadAndWriteWithSameFlatTableDefinition() throws Exception {
        String tblProperties = this.objectsProvider.getPayloadFormat() == null ? "" : String.format("TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'", PayloadMessages.NameHeightKnowsJSMessage.class.getName(), this.objectsProvider.getPayloadFormat());
        String createTableString = String.format("CREATE EXTERNAL TABLE people (\nevent_timestamp TIMESTAMP, \nname VARCHAR, \nheight INTEGER, \nknows_javascript BOOLEAN \n) \nTYPE '%s' \nLOCATION '%s' \n%s", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), tblProperties);
        String filteredTblProperties = this.objectsProvider.getPayloadFormat() == null ? "" : String.format("TBLPROPERTIES '{ \"protoClass\" : \"%s\", \"format\": \"%s\" }'", PayloadMessages.NameHeightMessage.class.getName(), this.objectsProvider.getPayloadFormat());
        String createFilteredTableString = String.format("CREATE EXTERNAL TABLE javascript_people (\nevent_timestamp TIMESTAMP, \nname VARCHAR, \nheight INTEGER \n) \nTYPE '%s' \nLOCATION '%s' \n%s", this.tableProvider.getTableType(), this.filteredEventsTopic.topicPath(), filteredTblProperties);
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubTableProvider()});
        sqlEnv.executeDdl(createTableString);
        sqlEnv.executeDdl(createFilteredTableString);
        String filterQueryString = "INSERT INTO javascript_people (name, height) (\n  SELECT \n    name, \n    height \n  FROM people \n  WHERE knows_javascript \n)";
        String injectQueryString = "INSERT INTO people (name, height, knows_javascript) VALUES \n('person1', 80, TRUE),  \n('person2', 70, FALSE), \n('person3', 60, TRUE),  \n('person4', 50, FALSE), \n('person5', 40, TRUE)";
        this.query(sqlEnv, this.filterPipeline, filterQueryString);
        this.query(sqlEnv, this.pipeline, injectQueryString);
        this.filterPipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)5L));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes((long)5L));
        this.filteredEventsTopic.assertThatTopicEventuallyReceives(new Matcher[]{this.objectsProvider.matcherNameHeight("person1", 80), this.objectsProvider.matcherNameHeight("person3", 60), this.objectsProvider.matcherNameHeight("person5", 40)}).waitForUpTo(Duration.standardMinutes((long)5L));
    }

    private CalciteConnection connect(PipelineOptions options, TableProvider ... tableProviders) {
        Map<String, String> argsMap = ((Map)((Map)MAPPER.convertValue((Object)this.pipeline.getOptions(), Map.class)).get("options")).entrySet().stream().filter(entry -> {
            if (entry.getValue() instanceof List) {
                if (!((List)entry.getValue()).isEmpty()) {
                    throw new IllegalArgumentException("Cannot encode list arguments");
                }
                return false;
            }
            return true;
        }).collect(Collectors.toMap(Map.Entry::getKey, entry -> PubsubTableProviderIT.toArg(entry.getValue())));
        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
        for (TableProvider tableProvider : tableProviders) {
            inMemoryMetaStore.registerProvider(tableProvider);
        }
        JdbcConnection connection = JdbcDriver.connect((TableProvider)inMemoryMetaStore, (PipelineOptions)options);
        connection.setPipelineOptionsMap(argsMap);
        return connection;
    }

    private static String toArg(Object o) {
        try {
            String jsonRepr = MAPPER.writeValueAsString(o);
            if (jsonRepr.startsWith("\"") && jsonRepr.endsWith("\"")) {
                return jsonRepr.substring(1, jsonRepr.length() - 1);
            }
            return jsonRepr;
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private String payloadFormatParam() {
        return this.objectsProvider.getPayloadFormat() == null ? "" : String.format("\"format\" : \"%s\", ", this.objectsProvider.getPayloadFormat());
    }

    private PCollection<Row> query(BeamSqlEnv sqlEnv, TestPipeline pipeline, String queryString) {
        return BeamSqlRelUtils.toPCollection((Pipeline)pipeline, (BeamRelNode)sqlEnv.parseQuery(queryString));
    }

    private Row row(Schema schema, Object ... values) {
        return Row.withSchema((Schema)schema).addValues(values).build();
    }

    private static PubsubMessage message(Instant timestamp, byte[] payload, Map<String, String> attributes) {
        return new PubsubMessage(payload, (Map)ImmutableMap.builder().putAll(attributes).put((Object)"ts", (Object)String.valueOf(timestamp.getMillis())).build());
    }

    private Matcher<PubsubMessage> matcherTsNameHeightKnowsJS(Instant ts, String name, int height, boolean knowsJS) throws Exception {
        return Matchers.allOf(this.objectsProvider.matcherNameHeightKnowsJS(name, height, knowsJS), (Matcher)Matchers.hasProperty((String)"attributeMap", (Matcher)Matchers.hasEntry((Object)"ts", (Object)String.valueOf(ts.getMillis()))));
    }

    private Matcher<PubsubMessage> matcherPayload(Instant timestamp, String payload) {
        return Matchers.allOf((Matcher)Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)payload.getBytes(StandardCharsets.US_ASCII))), (Matcher)Matchers.hasProperty((String)"attributeMap", (Matcher)Matchers.hasEntry((Object)"ts", (Object)String.valueOf(timestamp.getMillis()))));
    }

    private Instant ts(long millis) {
        return Instant.ofEpochMilli((long)millis);
    }

    private PubsubMessage messagePayload(Instant timestamp, String payload, Map<String, String> attributes) {
        return PubsubTableProviderIT.message(timestamp, payload.getBytes(StandardCharsets.US_ASCII), attributes);
    }

    private static /* synthetic */ Boolean lambda$testSQLWithBytePayload$90b56436$1(Set expected, Set observedRows) {
        return observedRows.equals(expected);
    }

    private static class PubsubAvroObjectProvider
    extends PubsubObjectProvider {
        private static final Schema NAME_HEIGHT_KNOWS_JS_SCHEMA = Schema.builder().addNullableField("name", Schema.FieldType.STRING).addNullableField("height", Schema.FieldType.INT32).addNullableField("knows_javascript", Schema.FieldType.BOOLEAN).build();
        private static final Schema NAME_HEIGHT_SCHEMA = Schema.builder().addNullableField("name", Schema.FieldType.STRING).addNullableField("height", Schema.FieldType.INT32).build();

        private PubsubAvroObjectProvider() {
        }

        @Override
        protected String getPayloadFormat() {
            return "avro";
        }

        @Override
        protected PubsubMessage messageIdName(Instant timestamp, int id, String name) throws IOException {
            byte[] encodedRecord = this.createEncodedGenericRecord(PAYLOAD_SCHEMA, (List<Object>)ImmutableList.of((Object)id, (Object)name));
            return PubsubTableProviderIT.message(timestamp, encodedRecord, (Map)ImmutableMap.of((Object)name, (Object)Integer.toString(id)));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
            Schema schema = Schema.builder().addStringField("name").build();
            byte[] encodedRecord = this.createEncodedGenericRecord(schema, (List<Object>)ImmutableList.of((Object)name));
            return Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)encodedRecord));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
            byte[] encodedRecord = this.createEncodedGenericRecord(NAME_HEIGHT_SCHEMA, (List<Object>)ImmutableList.of((Object)name, (Object)height));
            return Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)encodedRecord));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(String name, int height, boolean knowsJS) throws IOException {
            byte[] encodedRecord = this.createEncodedGenericRecord(NAME_HEIGHT_KNOWS_JS_SCHEMA, (List<Object>)ImmutableList.of((Object)name, (Object)height, (Object)knowsJS));
            return Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)encodedRecord));
        }

        private byte[] createEncodedGenericRecord(Schema beamSchema, List<Object> values) throws IOException {
            org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema((Schema)beamSchema);
            GenericRecordBuilder builder = new GenericRecordBuilder(avroSchema);
            List fields = avroSchema.getFields();
            for (int i = 0; i < fields.size(); ++i) {
                builder.set((Schema.Field)fields.get(i), values.get(i));
            }
            AvroGenericCoder coder = AvroCoder.of((org.apache.avro.Schema)avroSchema);
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            coder.encode((Object)builder.build(), (OutputStream)out);
            return out.toByteArray();
        }
    }

    private static class PubsubJsonObjectProvider
    extends PubsubObjectProvider {
        private PubsubJsonObjectProvider() {
        }

        @Override
        protected String getPayloadFormat() {
            return null;
        }

        @Override
        protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
            String jsonString = "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }";
            return this.message(timestamp, jsonString, (Map<String, String>)ImmutableMap.of((Object)name, (Object)Integer.toString(id)));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
            return Matchers.hasProperty((String)"payload", this.toJsonByteLike(String.format("{\"name\":\"%s\"}", name)));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(String name, int height, boolean knowsJS) throws IOException {
            String jsonString = String.format("{\"name\":\"%s\", \"height\": %s, \"knows_javascript\": %s}", name, height, knowsJS);
            return Matchers.hasProperty((String)"payload", this.toJsonByteLike(jsonString));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
            String jsonString = String.format("{\"name\":\"%s\", \"height\": %s}", name, height);
            return Matchers.hasProperty((String)"payload", this.toJsonByteLike(jsonString));
        }

        private PubsubMessage message(Instant timestamp, String jsonPayload, Map<String, String> attributes) {
            return PubsubTableProviderIT.message(timestamp, jsonPayload.getBytes(StandardCharsets.UTF_8), attributes);
        }

        private Matcher<byte[]> toJsonByteLike(String jsonString) throws IOException {
            return JsonMatcher.jsonBytesLike((String)jsonString);
        }
    }

    private static class PubsubProtoObjectProvider
    extends PubsubObjectProvider {
        private PubsubProtoObjectProvider() {
        }

        @Override
        protected String getPayloadFormat() {
            return "proto";
        }

        @Override
        protected PubsubMessage messageIdName(Instant timestamp, int id, String name) {
            PayloadMessages.SimpleMessage.Builder simpleMessage = PayloadMessages.SimpleMessage.newBuilder().setId(id).setName(name);
            return PubsubTableProviderIT.message(timestamp, simpleMessage.build().toByteArray(), (Map)ImmutableMap.of((Object)name, (Object)Integer.toString(id)));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNames(String name) throws IOException {
            PayloadMessages.NameMessage.Builder nameMessage = PayloadMessages.NameMessage.newBuilder().setName(name);
            return Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)nameMessage.build().toByteArray()));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(String name, int height, boolean knowsJS) throws IOException {
            PayloadMessages.NameHeightKnowsJSMessage.Builder nameHeightKnowsJSMessage = PayloadMessages.NameHeightKnowsJSMessage.newBuilder().setHeight(height).setName(name).setKnowsJavascript(knowsJS);
            return Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)nameHeightKnowsJSMessage.build().toByteArray()));
        }

        @Override
        protected Matcher<PubsubMessage> matcherNameHeight(String name, int height) throws IOException {
            PayloadMessages.NameHeightMessage.Builder nameHeightMessage = PayloadMessages.NameHeightMessage.newBuilder().setName(name).setHeight(height);
            return Matchers.hasProperty((String)"payload", (Matcher)Matchers.equalTo((Object)nameHeightMessage.build().toByteArray()));
        }
    }

    private static abstract class PubsubObjectProvider
    implements Serializable {
        private PubsubObjectProvider() {
        }

        protected abstract String getPayloadFormat();

        protected abstract PubsubMessage messageIdName(Instant var1, int var2, String var3) throws Exception;

        protected abstract Matcher<PubsubMessage> matcherNames(String var1) throws Exception;

        protected abstract Matcher<PubsubMessage> matcherNameHeightKnowsJS(String var1, int var2, boolean var3) throws Exception;

        protected abstract Matcher<PubsubMessage> matcherNameHeight(String var1, int var2) throws Exception;
    }
}

