/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.beam.repackaged.sql.org.apache.calcite.jdbc.CalciteConnection;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.JdbcConnection;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SerializableFunctions;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.guava.v20_0.com.google.common.base.Supplier;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableSet;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=JUnit4.class)
public class PubsubJsonIT
implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(PubsubJsonIT.class);
    private static final Schema PAYLOAD_SCHEMA = Schema.builder().addNullableField("id", Schema.FieldType.INT32).addNullableField("name", Schema.FieldType.STRING).build();
    private static final String CONNECT_STRING_PREFIX = "jdbc:beam:";
    private static final String BEAM_CALCITE_SCHEMA = "beamCalciteSchema";
    private static final JdbcDriver INSTANCE = new JdbcDriver();
    private static volatile Boolean checked = false;
    @Rule
    public transient TestPubsub eventsTopic = TestPubsub.create();
    @Rule
    public transient TestPubsub dlqTopic = TestPubsub.create();
    @Rule
    public transient TestPubsubSignal resultSignal = TestPubsubSignal.create();
    @Rule
    public transient TestPubsubSignal dlqSignal = TestPubsubSignal.create();
    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private static final ObjectMapper MAPPER = new ObjectMapper().registerModules((Iterable)ObjectMapper.findModules((ClassLoader)ReflectHelpers.findClassLoader()));

    @Test
    public void testSelectsPayloadContent() throws Exception {
        String createTableString = "CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.eventsTopic.topicPath() + "' \nTBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'";
        String queryString = "SELECT message.payload.id, message.payload.name from message";
        ImmutableList messages = ImmutableList.of((Object)this.message(this.ts(1L), 3, "foo"), (Object)this.message(this.ts(2L), 5, "bar"), (Object)this.message(this.ts(3L), 7, "baz"));
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubJsonTableProvider()});
        sqlEnv.executeDdl(createTableString);
        PCollection<Row> queryOutput = this.query(sqlEnv, this.pipeline, queryString);
        queryOutput.apply("waitForSuccess", this.resultSignal.signalSuccessWhen((Coder)SchemaCoder.of((Schema)PAYLOAD_SCHEMA, (SerializableFunction)SerializableFunctions.identity(), (SerializableFunction)SerializableFunctions.identity()), (SerializableFunction & Serializable)observedRows -> observedRows.equals(ImmutableSet.of((Object)this.row(PAYLOAD_SCHEMA, 3, "foo"), (Object)this.row(PAYLOAD_SCHEMA, 5, "bar"), (Object)this.row(PAYLOAD_SCHEMA, 7, "baz")))));
        Supplier start = this.resultSignal.waitForStart(Duration.standardMinutes((long)5L));
        this.pipeline.begin().apply(this.resultSignal.signalStart());
        this.pipeline.run();
        start.get();
        this.eventsTopic.publish((List)messages);
        this.resultSignal.waitForSuccess(Duration.standardSeconds((long)60L));
    }

    @Ignore(value="Disable flake tracked at https://issues.apache.org/jira/browse/BEAM-5122")
    @Test
    public void testUsesDlq() throws Exception {
        String createTableString = "CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.eventsTopic.topicPath() + "' \nTBLPROPERTIES     '{        \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"" + this.dlqTopic.topicPath() + "\"     }'";
        String queryString = "SELECT message.payload.id, message.payload.name from message";
        ImmutableList messages = ImmutableList.of((Object)this.message(this.ts(1L), 3, "foo"), (Object)this.message(this.ts(2L), 5, "bar"), (Object)this.message(this.ts(3L), 7, "baz"), (Object)this.message(this.ts(4L), "{ - }"), (Object)this.message(this.ts(5L), "{ + }"));
        BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory((TableProvider[])new TableProvider[]{new PubsubJsonTableProvider()});
        sqlEnv.executeDdl(createTableString);
        PCollection<Row> queryOutput = this.query(sqlEnv, this.pipeline, queryString);
        queryOutput.apply("waitForSuccess", this.resultSignal.signalSuccessWhen((Coder)SchemaCoder.of((Schema)PAYLOAD_SCHEMA, (SerializableFunction)SerializableFunctions.identity(), (SerializableFunction)SerializableFunctions.identity()), (SerializableFunction & Serializable)observedRows -> observedRows.equals(ImmutableSet.of((Object)this.row(PAYLOAD_SCHEMA, 3, "foo"), (Object)this.row(PAYLOAD_SCHEMA, 5, "bar"), (Object)this.row(PAYLOAD_SCHEMA, 7, "baz")))));
        Supplier start = this.resultSignal.waitForStart(Duration.standardMinutes((long)5L));
        this.pipeline.begin().apply("signal query results started", this.resultSignal.signalStart());
        PCollection dlq = (PCollection)this.pipeline.apply((PTransform)PubsubIO.readMessagesWithAttributes().fromTopic(this.dlqTopic.topicPath().getPath()));
        dlq.apply("waitForDlq", this.dlqSignal.signalSuccessWhen((Coder)PubsubMessageWithAttributesCoder.of(), (SerializableFunction & Serializable)dlqMessages -> PubsubJsonIT.containsAll(dlqMessages, this.message(this.ts(4L), "{ - }"), this.message(this.ts(5L), "{ + }"))));
        Supplier startDlq = this.dlqSignal.waitForStart(Duration.standardMinutes((long)5L));
        this.pipeline.begin().apply("signal DLQ started", this.dlqSignal.signalStart());
        this.pipeline.run();
        start.get();
        startDlq.get();
        this.eventsTopic.publish((List)messages);
        this.resultSignal.waitForSuccess(Duration.standardMinutes((long)2L));
        this.dlqSignal.waitForSuccess(Duration.standardMinutes((long)2L));
    }

    @Test
    @Ignore(value="https://jira.apache.org/jira/browse/BEAM-7582")
    public void testSQLLimit() throws Exception {
        String createTableString = "CREATE EXTERNAL TABLE message (\nevent_timestamp TIMESTAMP, \nattributes MAP<VARCHAR, VARCHAR>, \npayload ROW< \n             id INTEGER, \n             name VARCHAR \n           > \n) \nTYPE 'pubsub' \nLOCATION '" + this.eventsTopic.topicPath() + "' \nTBLPROPERTIES     '{        \"timestampAttributeKey\" : \"ts\",        \"deadLetterQueue\" : \"" + this.dlqTopic.topicPath() + "\"     }'";
        ImmutableList messages = ImmutableList.of((Object)this.message(this.ts(1L), 3, "foo"), (Object)this.message(this.ts(2L), 5, "bar"), (Object)this.message(this.ts(3L), 7, "baz"), (Object)this.message(this.ts(4L), 9, "ba2"), (Object)this.message(this.ts(5L), 10, "ba3"), (Object)this.message(this.ts(6L), 13, "ba4"), (Object)this.message(this.ts(7L), 15, "ba5"));
        CalciteConnection connection = this.connect(this.pipeline.getOptions(), new TableProvider[]{new PubsubJsonTableProvider()});
        Statement statement = connection.createStatement();
        statement.execute(createTableString);
        ExecutorService pool = Executors.newFixedThreadPool(1);
        Future<Object> queryResult = pool.submit(() -> {
            ResultSet resultSet = statement.executeQuery("SELECT message.payload.id FROM message LIMIT 3");
            ImmutableList.Builder result = ImmutableList.builder();
            while (resultSet.next()) {
                result.add((Object)resultSet.getString(1));
            }
            return result.build();
        });
        this.eventsTopic.checkIfAnySubscriptionExists(((GcpOptions)this.pipeline.getOptions().as(GcpOptions.class)).getProject(), Duration.standardMinutes((long)1L));
        this.eventsTopic.publish((List)messages);
        Assert.assertThat((Object)((List)queryResult.get(2L, TimeUnit.MINUTES)).size(), (Matcher)Matchers.equalTo((Object)3));
        pool.shutdown();
    }

    private static String toArg(Object o) {
        try {
            String jsonRepr = MAPPER.writeValueAsString(o);
            if (jsonRepr.startsWith("\"") && jsonRepr.endsWith("\"")) {
                return jsonRepr.substring(1, jsonRepr.length() - 1);
            }
            return jsonRepr;
        }
        catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    private CalciteConnection connect(PipelineOptions options, TableProvider ... tableProviders) throws SQLException {
        Map<String, String> argsMap = ((Map)((Map)MAPPER.convertValue((Object)this.pipeline.getOptions(), Map.class)).get("options")).entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, entry -> PubsubJsonIT.toArg(entry.getValue())));
        InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
        for (TableProvider tableProvider : tableProviders) {
            inMemoryMetaStore.registerProvider(tableProvider);
        }
        JdbcConnection connection = JdbcDriver.connect((TableProvider)inMemoryMetaStore);
        connection.setPipelineOptionsMap(argsMap);
        return connection;
    }

    private static Boolean containsAll(Set<PubsubMessage> set, PubsubMessage ... subsetCandidate) {
        return Arrays.stream(subsetCandidate).allMatch(candidate -> set.stream().anyMatch(element -> PubsubJsonIT.messagesEqual(element, candidate)));
    }

    private static boolean messagesEqual(PubsubMessage message1, PubsubMessage message2) {
        return message1.getAttributeMap().equals(message2.getAttributeMap()) && Arrays.equals(message1.getPayload(), message2.getPayload());
    }

    private Row row(Schema schema, Object ... values) {
        return Row.withSchema((Schema)schema).addValues(values).build();
    }

    private PCollection<Row> query(BeamSqlEnv sqlEnv, TestPipeline pipeline, String queryString) throws Exception {
        return BeamSqlRelUtils.toPCollection((Pipeline)pipeline, (BeamRelNode)sqlEnv.parseQuery(queryString));
    }

    private PubsubMessage message(Instant timestamp, int id, String name) {
        return this.message(timestamp, this.jsonString(id, name));
    }

    private PubsubMessage message(Instant timestamp, String jsonPayload) {
        return new PubsubMessage(jsonPayload.getBytes(StandardCharsets.UTF_8), (Map)ImmutableMap.of((Object)"ts", (Object)String.valueOf(timestamp.getMillis())));
    }

    private String jsonString(int id, String name) {
        return "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }";
    }

    private Instant ts(long millis) {
        return new Instant(millis);
    }
}

