package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.AvroGenericCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.utils.AvroUtils;
import org.apache.beam.sdk.testing.JsonMatcher;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteConnection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT.class */
public class PubsubTableProviderIT implements Serializable {

    @Rule
    public transient TestPubsub eventsTopic = TestPubsub.create();

    @Rule
    public transient TestPubsub filteredEventsTopic = TestPubsub.create();

    @Rule
    public transient TestPubsub dlqTopic = TestPubsub.create();

    @Rule
    public transient TestPubsubSignal resultSignal = TestPubsubSignal.create();

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();

    @Rule
    public transient TestPipeline filterPipeline = TestPipeline.create();
    private final SchemaIOTableProviderWrapper tableProvider = new PubsubTableProvider();

    @Parameterized.Parameter
    public PubsubObjectProvider objectsProvider;
    private static final Schema PAYLOAD_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT$PubsubAvroObjectProvider.class */
    private static class PubsubAvroObjectProvider extends PubsubObjectProvider {
        private static final Schema NAME_HEIGHT_KNOWS_JS_SCHEMA = Schema.builder().addNullableField("name", Schema.FieldType.STRING).addNullableField("height", Schema.FieldType.INT32).addNullableField("knowsJavascript", Schema.FieldType.BOOLEAN).build();
        private static final Schema NAME_HEIGHT_SCHEMA = Schema.builder().addNullableField("name", Schema.FieldType.STRING).addNullableField("height", Schema.FieldType.INT32).build();

        private PubsubAvroObjectProvider() {
            super();
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected String getPayloadFormat() {
            return "avro";
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected PubsubMessage messageIdName(Instant instant, int i, String str) throws IOException {
            return PubsubTableProviderIT.message(instant, createEncodedGenericRecord(PubsubTableProviderIT.PAYLOAD_SCHEMA, ImmutableList.of(Integer.valueOf(i), str)));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected Matcher<PubsubMessage> matcherNames(String str) throws IOException {
            return Matchers.hasProperty("payload", Matchers.equalTo(createEncodedGenericRecord(Schema.builder().addStringField("name").build(), ImmutableList.of(str))));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected Matcher<PubsubMessage> matcherNameHeight(String str, int i) throws IOException {
            return Matchers.hasProperty("payload", Matchers.equalTo(createEncodedGenericRecord(NAME_HEIGHT_SCHEMA, ImmutableList.of(str, Integer.valueOf(i)))));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(String str, int i, boolean z) throws IOException {
            return Matchers.hasProperty("payload", Matchers.equalTo(createEncodedGenericRecord(NAME_HEIGHT_KNOWS_JS_SCHEMA, ImmutableList.of(str, Integer.valueOf(i), Boolean.valueOf(z)))));
        }

        private byte[] createEncodedGenericRecord(Schema schema, List<Object> list) throws IOException {
            org.apache.avro.Schema avroSchema = AvroUtils.toAvroSchema(schema);
            GenericRecordBuilder genericRecordBuilder = new GenericRecordBuilder(avroSchema);
            List fields = avroSchema.getFields();
            for (int i = 0; i < fields.size(); i++) {
                genericRecordBuilder.set((Schema.Field) fields.get(i), list.get(i));
            }
            AvroGenericCoder of = AvroCoder.of(avroSchema);
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            of.encode(genericRecordBuilder.build(), byteArrayOutputStream);
            return byteArrayOutputStream.toByteArray();
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT$PubsubJsonObjectProvider.class */
    private static class PubsubJsonObjectProvider extends PubsubObjectProvider {
        private PubsubJsonObjectProvider() {
            super();
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected String getPayloadFormat() {
            return null;
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected PubsubMessage messageIdName(Instant instant, int i, String str) {
            return message(instant, "{ \"id\" : " + i + ", \"name\" : \"" + str + "\" }");
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected Matcher<PubsubMessage> matcherNames(String str) throws IOException {
            return Matchers.hasProperty("payload", toJsonByteLike(String.format("{\"name\":\"%s\"}", str)));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected Matcher<PubsubMessage> matcherNameHeightKnowsJS(String str, int i, boolean z) throws IOException {
            return Matchers.hasProperty("payload", toJsonByteLike(String.format("{\"name\":\"%s\", \"height\": %s, \"knowsJavascript\": %s}", str, Integer.valueOf(i), Boolean.valueOf(z))));
        }

        @Override // org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubTableProviderIT.PubsubObjectProvider
        protected Matcher<PubsubMessage> matcherNameHeight(String str, int i) throws IOException {
            return Matchers.hasProperty("payload", toJsonByteLike(String.format("{\"name\":\"%s\", \"height\": %s}", str, Integer.valueOf(i))));
        }

        private PubsubMessage message(Instant instant, String str) {
            return PubsubTableProviderIT.message(instant, str.getBytes(StandardCharsets.UTF_8));
        }

        private Matcher<byte[]> toJsonByteLike(String str) throws IOException {
            return JsonMatcher.jsonBytesLike(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT$PubsubObjectProvider.class */
    public static abstract class PubsubObjectProvider implements Serializable {
        private PubsubObjectProvider() {
        }

        protected abstract String getPayloadFormat();

        protected abstract PubsubMessage messageIdName(Instant instant, int i, String str) throws Exception;

        protected abstract Matcher<PubsubMessage> matcherNames(String str) throws Exception;

        protected abstract Matcher<PubsubMessage> matcherNameHeightKnowsJS(String str, int i, boolean z) throws Exception;

        protected abstract Matcher<PubsubMessage> matcherNameHeight(String str, int i) throws Exception;
    }

    @Parameterized.Parameters
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{new PubsubJsonObjectProvider()}, new Object[]{new PubsubAvroObjectProvider()});
    }

    @Test
    public void testSQLSelectsPayloadContent() throws Exception {
        String format = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES '{ %s\"timestampAttributeKey\" : \"ts\" }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), payloadFormatParam());
        org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList of = org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList.of(this.objectsProvider.messageIdName(ts(1L), 3, "foo"), this.objectsProvider.messageIdName(ts(2L), 5, "bar"), this.objectsProvider.messageIdName(ts(3L), 7, "baz"));
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubTableProvider()});
        inMemory.executeDdl(format);
        query(inMemory, this.pipeline, "SELECT message.payload.id, message.payload.name from message").apply("waitForSuccess", this.resultSignal.signalSuccessWhen(SchemaCoder.of(PAYLOAD_SCHEMA), set -> {
            return Boolean.valueOf(set.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
        }));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(this.pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5L));
        this.eventsTopic.publish(of);
        this.resultSignal.waitForSuccess(Duration.standardMinutes(5L));
    }

    @Test
    public void testUsesDlq() throws Exception {
        String format = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"%s\"     }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), payloadFormatParam(), this.dlqTopic.topicPath());
        org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList of = org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList.of(this.objectsProvider.messageIdName(ts(1L), 3, "foo"), this.objectsProvider.messageIdName(ts(2L), 5, "bar"), this.objectsProvider.messageIdName(ts(3L), 7, "baz"), messagePayload(ts(4L), "{ - }"), messagePayload(ts(5L), "{ + }"));
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubTableProvider()});
        inMemory.executeDdl(format);
        query(inMemory, this.pipeline, "SELECT message.payload.id, message.payload.name from message").apply("waitForSuccess", this.resultSignal.signalSuccessWhen(SchemaCoder.of(PAYLOAD_SCHEMA), set -> {
            return Boolean.valueOf(set.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
        }));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(this.pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5L));
        this.eventsTopic.publish(of);
        this.resultSignal.waitForSuccess(Duration.standardMinutes(4L));
        this.dlqTopic.assertThatTopicEventuallyReceives(new Matcher[]{matcherPayload(ts(4L), "{ - }"), matcherPayload(ts(5L), "{ + }")}).waitForUpTo(Duration.standardSeconds(40L));
    }

    @Test
    public void testSQLLimit() throws Exception {
        String format = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"%s\"     }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), payloadFormatParam(), this.dlqTopic.topicPath());
        org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList of = org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList.of(this.objectsProvider.messageIdName(ts(1L), 3, "foo"), this.objectsProvider.messageIdName(ts(2L), 5, "bar"), this.objectsProvider.messageIdName(ts(3L), 7, "baz"), this.objectsProvider.messageIdName(ts(4L), 9, "ba2"), this.objectsProvider.messageIdName(ts(5L), 10, "ba3"), this.objectsProvider.messageIdName(ts(6L), 13, "ba4"), this.objectsProvider.messageIdName(ts(7L), 15, "ba5"));
        Statement createStatement = connect(this.pipeline.getOptions(), new PubsubTableProvider()).createStatement();
        createStatement.execute(format);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        Future submit = newFixedThreadPool.submit(() -> {
            ResultSet executeQuery = createStatement.executeQuery("SELECT message.payload.id FROM message LIMIT 3");
            ImmutableList.Builder builder = org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList.builder();
            while (executeQuery.next()) {
                builder.add(executeQuery.getString(1));
            }
            return builder.build();
        });
        this.eventsTopic.assertSubscriptionEventuallyCreated(this.pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5L));
        this.eventsTopic.publish(of);
        MatcherAssert.assertThat(Integer.valueOf(((List) submit.get(2L, TimeUnit.MINUTES)).size()), Matchers.equalTo(3));
        newFixedThreadPool.shutdown();
    }

    @Test
    public void testSQLSelectsPayloadContentFlat() throws Exception {
        String format = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nid INTEGER, \nname VARCHAR \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"timestampAttributeKey\" : \"ts\"      }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), payloadFormatParam());
        org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList of = org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList.of(this.objectsProvider.messageIdName(ts(1L), 3, "foo"), this.objectsProvider.messageIdName(ts(2L), 5, "bar"), this.objectsProvider.messageIdName(ts(3L), 7, "baz"));
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubTableProvider()});
        inMemory.executeDdl(format);
        query(inMemory, this.pipeline, "SELECT message.id, message.name from message").apply("waitForSuccess", this.resultSignal.signalSuccessWhen(SchemaCoder.of(PAYLOAD_SCHEMA), set -> {
            return Boolean.valueOf(set.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
        }));
        this.pipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(this.pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5L));
        this.eventsTopic.publish(of);
        this.resultSignal.waitForSuccess(Duration.standardMinutes(5L));
    }

    @Test
    public void testSQLInsertRowsToPubsubFlat() throws Exception {
        String format = String.format("CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nname VARCHAR, \nheight INTEGER, \nknowsJavascript BOOLEAN \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES     '{        %s       \"deadLetterQueue\" : \"%s\"     }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), payloadFormatParam(), this.dlqTopic.topicPath());
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubTableProvider()});
        inMemory.executeDdl(format);
        query(inMemory, this.pipeline, "INSERT INTO message (name, height, knowsJavascript) \nVALUES \n('person1', 80, TRUE), \n('person2', 70, FALSE)");
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        this.eventsTopic.assertThatTopicEventuallyReceives(new Matcher[]{this.objectsProvider.matcherNameHeightKnowsJS("person1", 80, true), this.objectsProvider.matcherNameHeightKnowsJS("person2", 70, false)}).waitForUpTo(Duration.standardSeconds(40L));
    }

    @Test
    public void testSQLInsertRowsToPubsubWithTimestampAttributeFlat() throws Exception {
        String format = String.format("CREATE EXTERNAL TABLE message (\n  event_timestamp TIMESTAMP, \n  name VARCHAR, \n  height INTEGER, \n  knowsJavascript BOOLEAN \n) \nTYPE '%s' \nLOCATION '%s' \nTBLPROPERTIES   '{      %s      \"deadLetterQueue\" : \"%s\",     \"timestampAttributeKey\" : \"ts\"   }'", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), payloadFormatParam(), this.dlqTopic.topicPath());
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubTableProvider()});
        inMemory.executeDdl(format);
        query(inMemory, this.pipeline, "INSERT INTO message VALUES (TIMESTAMP '1970-01-01 00:00:00.001', 'person1', 80, TRUE), (TIMESTAMP '1970-01-01 00:00:00.002', 'person2', 70, FALSE)");
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        this.eventsTopic.assertThatTopicEventuallyReceives(new Matcher[]{matcherTsNameHeightKnowsJS(ts(1L), "person1", 80, true), matcherTsNameHeightKnowsJS(ts(2L), "person2", 70, false)}).waitForUpTo(Duration.standardSeconds(40L));
    }

    @Test
    public void testSQLReadAndWriteWithSameFlatTableDefinition() throws Exception {
        String format = this.objectsProvider.getPayloadFormat() == null ? "" : String.format("TBLPROPERTIES '{\"format\": \"%s\"}'", this.objectsProvider.getPayloadFormat());
        String format2 = String.format("CREATE EXTERNAL TABLE people (\nevent_timestamp TIMESTAMP, \nname VARCHAR, \nheight INTEGER, \nknowsJavascript BOOLEAN \n) \nTYPE '%s' \nLOCATION '%s' \n%s", this.tableProvider.getTableType(), this.eventsTopic.topicPath(), format);
        String format3 = String.format("CREATE EXTERNAL TABLE javascript_people (\nevent_timestamp TIMESTAMP, \nname VARCHAR, \nheight INTEGER \n) \nTYPE '%s' \nLOCATION '%s' \n%s", this.tableProvider.getTableType(), this.filteredEventsTopic.topicPath(), format);
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubTableProvider()});
        inMemory.executeDdl(format2);
        inMemory.executeDdl(format3);
        query(inMemory, this.filterPipeline, "INSERT INTO javascript_people (name, height) (\n  SELECT \n    name, \n    height \n  FROM people \n  WHERE knowsJavascript \n)");
        query(inMemory, this.pipeline, "INSERT INTO people (name, height, knowsJavascript) VALUES \n('person1', 80, TRUE),  \n('person2', 70, FALSE), \n('person3', 60, TRUE),  \n('person4', 50, FALSE), \n('person5', 40, TRUE)");
        this.filterPipeline.run();
        this.eventsTopic.assertSubscriptionEventuallyCreated(this.pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(5L));
        this.pipeline.run().waitUntilFinish(Duration.standardMinutes(5L));
        this.filteredEventsTopic.assertThatTopicEventuallyReceives(new Matcher[]{this.objectsProvider.matcherNameHeight("person1", 80), this.objectsProvider.matcherNameHeight("person3", 60), this.objectsProvider.matcherNameHeight("person5", 40)}).waitForUpTo(Duration.standardMinutes(5L));
    }

    private CalciteConnection connect(PipelineOptions pipelineOptions, TableProvider... tableProviderArr) {
        Map map = (Map) ((Map) ((Map) MAPPER.convertValue(this.pipeline.getOptions(), Map.class)).get("options")).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return toArg(entry.getValue());
        }));
        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
        for (TableProvider tableProvider : tableProviderArr) {
            inMemoryMetaStore.registerProvider(tableProvider);
        }
        JdbcConnection connect = JdbcDriver.connect(inMemoryMetaStore, pipelineOptions);
        connect.setPipelineOptionsMap(map);
        return connect;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String toArg(Object obj) {
        try {
            String writeValueAsString = MAPPER.writeValueAsString(obj);
            return (writeValueAsString.startsWith("\"") && writeValueAsString.endsWith("\"")) ? writeValueAsString.substring(1, writeValueAsString.length() - 1) : writeValueAsString;
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private String payloadFormatParam() {
        return this.objectsProvider.getPayloadFormat() == null ? "" : String.format("\"format\" : \"%s\", ", this.objectsProvider.getPayloadFormat());
    }

    private PCollection<Row> query(BeamSqlEnv beamSqlEnv, TestPipeline testPipeline, String str) {
        return BeamSqlRelUtils.toPCollection(testPipeline, beamSqlEnv.parseQuery(str));
    }

    private Row row(org.apache.beam.sdk.schemas.Schema schema, Object... objArr) {
        return Row.withSchema(schema).addValues(objArr).build();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static PubsubMessage message(Instant instant, byte[] bArr) {
        return new PubsubMessage(bArr, ImmutableMap.of("ts", String.valueOf(instant.getMillis())));
    }

    private Matcher<PubsubMessage> matcherTsNameHeightKnowsJS(Instant instant, String str, int i, boolean z) throws Exception {
        return Matchers.allOf(this.objectsProvider.matcherNameHeightKnowsJS(str, i, z), Matchers.hasProperty("attributeMap", Matchers.hasEntry("ts", String.valueOf(instant.getMillis()))));
    }

    private Matcher<PubsubMessage> matcherPayload(Instant instant, String str) {
        return Matchers.allOf(Matchers.hasProperty("payload", Matchers.equalTo(str.getBytes(StandardCharsets.US_ASCII))), Matchers.hasProperty("attributeMap", Matchers.hasEntry("ts", String.valueOf(instant.getMillis()))));
    }

    private Instant ts(long j) {
        return Instant.ofEpochMilli(j);
    }

    private PubsubMessage messagePayload(Instant instant, String str) {
        return message(instant, str.getBytes(StandardCharsets.US_ASCII));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 964779213:
                if (implMethodName.equals("lambda$testSQLSelectsPayloadContent$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1424696290:
                if (implMethodName.equals("lambda$testUsesDlq$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
            case 2051086612:
                if (implMethodName.equals("lambda$testSQLSelectsPayloadContentFlat$43268ee4$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;)Ljava/lang/Boolean;")) {
                    PubsubTableProviderIT pubsubTableProviderIT = (PubsubTableProviderIT) serializedLambda.getCapturedArg(0);
                    return set -> {
                        return Boolean.valueOf(set.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;)Ljava/lang/Boolean;")) {
                    PubsubTableProviderIT pubsubTableProviderIT2 = (PubsubTableProviderIT) serializedLambda.getCapturedArg(0);
                    return set2 -> {
                        return Boolean.valueOf(set2.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubTableProviderIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;)Ljava/lang/Boolean;")) {
                    PubsubTableProviderIT pubsubTableProviderIT3 = (PubsubTableProviderIT) serializedLambda.getCapturedArg(0);
                    return set3 -> {
                        return Boolean.valueOf(set3.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
