package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.calcite.v1_20_0.com.google.common.collect.ImmutableSet;
import org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.jdbc.CalciteConnection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Supplier;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.class */
public class PubsubJsonIT implements Serializable {
    private static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
    private static final String BEAM_CALCITE_SCHEMA = "beamCalciteSchema";

    @Rule
    public transient TestPubsub eventsTopic = TestPubsub.create();

    @Rule
    public transient TestPubsub dlqTopic = TestPubsub.create();

    @Rule
    public transient TestPubsubSignal resultSignal = TestPubsubSignal.create();

    @Rule
    public transient TestPubsubSignal dlqSignal = TestPubsubSignal.create();

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private static final Logger LOG = LoggerFactory.getLogger(PubsubJsonIT.class);
    private static final Schema PAYLOAD_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
    private static final JdbcDriver INSTANCE = new JdbcDriver();
    private static volatile Boolean checked = false;
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));

    @Test
    public void testSelectsPayloadContent() throws Exception {
        String str = "CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.eventsTopic.topicPath() + "' \nTBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'";
        ImmutableList of = ImmutableList.of(message(ts(1L), 3, "foo"), message(ts(2L), 5, "bar"), message(ts(3L), 7, "baz"));
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubJsonTableProvider()});
        inMemory.executeDdl(str);
        query(inMemory, this.pipeline, "SELECT message.payload.id, message.payload.name from message").apply("waitForSuccess", this.resultSignal.signalSuccessWhen(SchemaCoder.of(PAYLOAD_SCHEMA), set -> {
            return Boolean.valueOf(set.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
        }));
        Supplier waitForStart = this.resultSignal.waitForStart(Duration.standardMinutes(5L));
        this.pipeline.begin().apply(this.resultSignal.signalStart());
        this.pipeline.run();
        waitForStart.get();
        this.eventsTopic.publish(of);
        this.resultSignal.waitForSuccess(Duration.standardSeconds(60L));
    }

    @Test
    @Ignore("Disable flake tracked at https://issues.apache.org/jira/browse/BEAM-5122")
    public void testUsesDlq() throws Exception {
        String str = "CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.eventsTopic.topicPath() + "' \nTBLPROPERTIES     '{        \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"" + this.dlqTopic.topicPath() + "\"     }'";
        ImmutableList of = ImmutableList.of(message(ts(1L), 3, "foo"), message(ts(2L), 5, "bar"), message(ts(3L), 7, "baz"), message(ts(4L), "{ - }"), message(ts(5L), "{ + }"));
        BeamSqlEnv inMemory = BeamSqlEnv.inMemory(new TableProvider[]{new PubsubJsonTableProvider()});
        inMemory.executeDdl(str);
        query(inMemory, this.pipeline, "SELECT message.payload.id, message.payload.name from message").apply("waitForSuccess", this.resultSignal.signalSuccessWhen(SchemaCoder.of(PAYLOAD_SCHEMA), set -> {
            return Boolean.valueOf(set.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
        }));
        Supplier waitForStart = this.resultSignal.waitForStart(Duration.standardMinutes(5L));
        this.pipeline.begin().apply("signal query results started", this.resultSignal.signalStart());
        this.pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(this.dlqTopic.topicPath().getPath())).apply("waitForDlq", this.dlqSignal.signalSuccessWhen(PubsubMessageWithAttributesCoder.of(), set2 -> {
            return containsAll(set2, message(ts(4L), "{ - }"), message(ts(5L), "{ + }"));
        }));
        Supplier waitForStart2 = this.dlqSignal.waitForStart(Duration.standardMinutes(5L));
        this.pipeline.begin().apply("signal DLQ started", this.dlqSignal.signalStart());
        this.pipeline.run();
        waitForStart.get();
        waitForStart2.get();
        this.eventsTopic.publish(of);
        this.resultSignal.waitForSuccess(Duration.standardMinutes(2L));
        this.dlqSignal.waitForSuccess(Duration.standardMinutes(2L));
    }

    @Test
    @Ignore("https://jira.apache.org/jira/browse/BEAM-7582")
    public void testSQLLimit() throws Exception {
        String str = "CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.eventsTopic.topicPath() + "' \nTBLPROPERTIES     '{        \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"" + this.dlqTopic.topicPath() + "\"     }'";
        ImmutableList of = ImmutableList.of(message(ts(1L), 3, "foo"), message(ts(2L), 5, "bar"), message(ts(3L), 7, "baz"), message(ts(4L), 9, "ba2"), message(ts(5L), 10, "ba3"), message(ts(6L), 13, "ba4"), message(ts(7L), 15, "ba5"));
        Statement createStatement = connect(this.pipeline.getOptions(), new PubsubJsonTableProvider()).createStatement();
        createStatement.execute(str);
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        Future submit = newFixedThreadPool.submit(() -> {
            ResultSet executeQuery = createStatement.executeQuery("SELECT message.payload.id FROM message LIMIT 3");
            ImmutableList.Builder builder = ImmutableList.builder();
            while (executeQuery.next()) {
                builder.add(executeQuery.getString(1));
            }
            return builder.build();
        });
        this.eventsTopic.checkIfAnySubscriptionExists(this.pipeline.getOptions().as(GcpOptions.class).getProject(), Duration.standardMinutes(1L));
        this.eventsTopic.publish(of);
        Assert.assertThat(Integer.valueOf(((List) submit.get(2L, TimeUnit.MINUTES)).size()), Matchers.equalTo(3));
        newFixedThreadPool.shutdown();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String toArg(Object obj) {
        try {
            String writeValueAsString = MAPPER.writeValueAsString(obj);
            return (writeValueAsString.startsWith("\"") && writeValueAsString.endsWith("\"")) ? writeValueAsString.substring(1, writeValueAsString.length() - 1) : writeValueAsString;
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private CalciteConnection connect(PipelineOptions pipelineOptions, TableProvider... tableProviderArr) throws SQLException {
        Map map = (Map) ((Map) ((Map) MAPPER.convertValue(this.pipeline.getOptions(), Map.class)).get("options")).entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return toArg(entry.getValue());
        }));
        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
        for (TableProvider tableProvider : tableProviderArr) {
            inMemoryMetaStore.registerProvider(tableProvider);
        }
        JdbcConnection connect = JdbcDriver.connect(inMemoryMetaStore, pipelineOptions);
        connect.setPipelineOptionsMap(map);
        return connect;
    }

    private static Boolean containsAll(Set<PubsubMessage> set, PubsubMessage... pubsubMessageArr) {
        return Boolean.valueOf(Arrays.stream(pubsubMessageArr).allMatch(pubsubMessage -> {
            return set.stream().anyMatch(pubsubMessage -> {
                return messagesEqual(pubsubMessage, pubsubMessage);
            });
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean messagesEqual(PubsubMessage pubsubMessage, PubsubMessage pubsubMessage2) {
        return pubsubMessage.getAttributeMap().equals(pubsubMessage2.getAttributeMap()) && Arrays.equals(pubsubMessage.getPayload(), pubsubMessage2.getPayload());
    }

    private Row row(Schema schema, Object... objArr) {
        return Row.withSchema(schema).addValues(objArr).build();
    }

    private PCollection<Row> query(BeamSqlEnv beamSqlEnv, TestPipeline testPipeline, String str) throws Exception {
        return BeamSqlRelUtils.toPCollection(testPipeline, beamSqlEnv.parseQuery(str));
    }

    private PubsubMessage message(Instant instant, int i, String str) {
        return message(instant, jsonString(i, str));
    }

    private PubsubMessage message(Instant instant, String str) {
        return new PubsubMessage(str.getBytes(StandardCharsets.UTF_8), ImmutableMap.of("ts", String.valueOf(instant.getMillis())));
    }

    private String jsonString(int i, String str) {
        return "{ \"id\" : " + i + ", \"name\" : \"" + str + "\" }";
    }

    private Instant ts(long j) {
        return new Instant(j);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -777594851:
                if (implMethodName.equals("lambda$testSelectsPayloadContent$43268ee4$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1424696290:
                if (implMethodName.equals("lambda$testUsesDlq$43268ee4$1")) {
                    z = true;
                    break;
                }
                break;
            case 1424696291:
                if (implMethodName.equals("lambda$testUsesDlq$43268ee4$2")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;)Ljava/lang/Boolean;")) {
                    PubsubJsonIT pubsubJsonIT = (PubsubJsonIT) serializedLambda.getCapturedArg(0);
                    return set2 -> {
                        return containsAll(set2, message(ts(4L), "{ - }"), message(ts(5L), "{ + }"));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;)Ljava/lang/Boolean;")) {
                    PubsubJsonIT pubsubJsonIT2 = (PubsubJsonIT) serializedLambda.getCapturedArg(0);
                    return set -> {
                        return Boolean.valueOf(set.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/Set;)Ljava/lang/Boolean;")) {
                    PubsubJsonIT pubsubJsonIT3 = (PubsubJsonIT) serializedLambda.getCapturedArg(0);
                    return set3 -> {
                        return Boolean.valueOf(set3.equals(ImmutableSet.of(row(PAYLOAD_SCHEMA, 3, "foo"), row(PAYLOAD_SCHEMA, 5, "bar"), row(PAYLOAD_SCHEMA, 7, "baz"))));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
