/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.presto;

import java.nio.ByteBuffer;
import java.util.Collections;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.KeyValueSchemaImpl;
import org.apache.pulsar.client.impl.schema.ProtobufNativeSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.presto.Stock;
import org.apache.pulsar.tests.integration.presto.StockProtoMessage;
import org.apache.pulsar.tests.integration.presto.TestPulsarSQLBase;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TestBasicPresto
extends TestPulsarSQLBase {
    private static final Logger log = LoggerFactory.getLogger(TestBasicPresto.class);
    private static final int NUM_OF_STOCKS = 10;

    private void setupPresto() throws Exception {
        log.info("[TestBasicPresto] setupPresto...");
        this.pulsarCluster.startPrestoWorker();
    }

    private void teardownPresto() {
        log.info("[TestBasicPresto] tearing down...");
        this.pulsarCluster.stopPrestoWorker();
    }

    @Override
    public void setupCluster() throws Exception {
        super.setupCluster();
        this.setupPresto();
    }

    @Override
    public void tearDownCluster() throws Exception {
        this.teardownPresto();
        super.tearDownCluster();
    }

    @DataProvider(name="schemaProvider")
    public Object[][] schemaProvider() {
        return new Object[][]{{Schema.BYTES}, {Schema.BYTEBUFFER}, {Schema.STRING}, {AvroSchema.of(Stock.class)}, {JSONSchema.of(Stock.class)}, {ProtobufNativeSchema.of(StockProtoMessage.Stock.class)}, {Schema.KeyValue((Schema)Schema.AVRO(Stock.class), (Schema)Schema.AVRO(Stock.class), (KeyValueEncodingType)KeyValueEncodingType.INLINE)}, {Schema.KeyValue((Schema)Schema.AVRO(Stock.class), (Schema)Schema.AVRO(Stock.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED)}};
    }

    @Test(dataProvider="batchingAndCompression")
    public void testSimpleSQLQuery(boolean batchEnabled, CompressionType compressionType) throws Exception {
        TopicName topicName = TopicName.get((String)("public/default/stocks_batched_" + TestBasicPresto.randomName(5)));
        this.pulsarSQLBasicTest(topicName, batchEnabled, false, (Schema)JSONSchema.of(Stock.class), compressionType);
    }

    @Test(dataProvider="schemaProvider")
    public void testForSchema(Schema schema) throws Exception {
        String schemaFlag = schema.getSchemaInfo().getType().isStruct() ? schema.getSchemaInfo().getType().name() : (schema.getSchemaInfo().getType().equals((Object)SchemaType.KEY_VALUE) ? schema.getSchemaInfo().getType().name() + "_" + ((KeyValueSchemaImpl)schema).getKeyValueEncodingType() : schema.getSchemaInfo().getName());
        String topic = String.format("public/default/schema_%s_test_%s", schemaFlag, TestBasicPresto.randomName(5)).toLowerCase();
        this.pulsarSQLBasicTest(TopicName.get((String)topic), false, false, schema, CompressionType.NONE);
    }

    @Test
    public void testForUppercaseTopic() throws Exception {
        TopicName topicName = TopicName.get((String)("public/default/case_UPPER_topic_" + TestBasicPresto.randomName(5)));
        this.pulsarSQLBasicTest(topicName, false, false, (Schema)JSONSchema.of(Stock.class), CompressionType.NONE);
    }

    @Test
    public void testForDifferentCaseTopic() throws Exception {
        String tableName = "diff_case_topic_" + TestBasicPresto.randomName(5);
        String topic1 = "public/default/" + tableName.toUpperCase();
        TopicName topicName1 = TopicName.get((String)topic1);
        this.prepareData(topicName1, false, false, (Schema)JSONSchema.of(Stock.class), CompressionType.NONE);
        String topic2 = "public/default/" + tableName;
        TopicName topicName2 = TopicName.get((String)topic2);
        this.prepareData(topicName2, false, false, (Schema)JSONSchema.of(Stock.class), CompressionType.NONE);
        try {
            String query = "select * from pulsar.\"public/default\".\"" + tableName + "\"";
            this.execQuery(query);
            Assert.fail((String)("The testForDifferentCaseTopic query [" + query + "] should be failed."));
        }
        catch (ContainerExecException e) {
            log.warn("Expected exception. result stderr: {}", (Object)e.getResult().getStderr(), (Object)e);
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("There are multiple topics"));
            Assert.assertTrue((boolean)e.getResult().getStderr().contains(topic1));
            Assert.assertTrue((boolean)e.getResult().getStderr().contains(topic2));
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("matched the table name public/default/" + tableName));
        }
    }

    @Test
    public void testListTopicShouldNotShowNonPersistentTopics() throws Exception {
        String tableName = "non_persistent" + TestBasicPresto.randomName(5);
        String topic1 = "non-persistent://public/default/" + tableName.toUpperCase();
        TopicName topicName1 = TopicName.get((String)topic1);
        this.prepareData(topicName1, false, false, (Schema)JSONSchema.of(Stock.class), CompressionType.NONE);
        String query = "show tables from pulsar.\"public/default\"";
        ContainerExecResult result = this.execQuery(query);
        Assert.assertFalse((boolean)result.getStdout().contains("non_persistent"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices, Schema schema, CompressionType compressionType) throws Exception {
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            if (schema.getSchemaInfo().getName().equals(Schema.BYTES.getSchemaInfo().getName())) {
                this.prepareDataForBytesSchema(pulsarClient, topicName, isBatch, compressionType);
            } else if (schema.getSchemaInfo().getName().equals(Schema.BYTEBUFFER.getSchemaInfo().getName())) {
                this.prepareDataForByteBufferSchema(pulsarClient, topicName, isBatch, compressionType);
            } else if (schema.getSchemaInfo().getType().equals((Object)SchemaType.STRING)) {
                this.prepareDataForStringSchema(pulsarClient, topicName, isBatch, compressionType);
            } else if (schema.getSchemaInfo().getType().equals((Object)SchemaType.JSON) || schema.getSchemaInfo().getType().equals((Object)SchemaType.AVRO)) {
                this.prepareDataForStructSchema(pulsarClient, topicName, isBatch, (Schema<Stock>)schema, compressionType);
            } else if (schema.getSchemaInfo().getType().equals((Object)SchemaType.PROTOBUF_NATIVE)) {
                this.prepareDataForProtobufNativeSchema(pulsarClient, topicName, isBatch, (Schema<StockProtoMessage.Stock>)schema, compressionType);
            } else if (schema.getSchemaInfo().getType().equals((Object)SchemaType.KEY_VALUE)) {
                this.prepareDataForKeyValueSchema(pulsarClient, topicName, (Schema<KeyValue<Stock, Stock>>)schema, compressionType);
            }
            int n = 10;
            return n;
        }
        finally {
            if (Collections.singletonList(pulsarClient).get(0) != null) {
                pulsarClient.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareDataForBytesSchema(PulsarClient pulsarClient, TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException {
        Producer producer = pulsarClient.newProducer(Schema.BYTES).topic(topicName.toString()).enableBatching(isBatch).compressionType(compressionType).create();
        try {
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)("bytes schema test" + i).getBytes());
            }
            producer.flush();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareDataForByteBufferSchema(PulsarClient pulsarClient, TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException {
        Producer producer = pulsarClient.newProducer(Schema.BYTEBUFFER).topic(topicName.toString()).enableBatching(isBatch).compressionType(compressionType).create();
        try {
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)ByteBuffer.wrap(("bytes schema test" + i).getBytes()));
            }
            producer.flush();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareDataForStringSchema(PulsarClient pulsarClient, TopicName topicName, boolean isBatch, CompressionType compressionType) throws PulsarClientException {
        Producer producer = pulsarClient.newProducer(Schema.STRING).topic(topicName.toString()).enableBatching(isBatch).compressionType(compressionType).create();
        try {
            for (int i = 0; i < 10; ++i) {
                producer.send((Object)("string" + i));
            }
            producer.flush();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareDataForStructSchema(PulsarClient pulsarClient, TopicName topicName, boolean isBatch, Schema<Stock> schema, CompressionType compressionType) throws Exception {
        Producer producer = pulsarClient.newProducer(schema).topic(topicName.toString()).enableBatching(isBatch).compressionType(compressionType).create();
        try {
            for (int i = 0; i < 10; ++i) {
                Stock stock = new Stock(i, "STOCK_" + i, 100.0 + (double)(i * 10));
                producer.send((Object)stock);
            }
            producer.flush();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareDataForProtobufNativeSchema(PulsarClient pulsarClient, TopicName topicName, boolean isBatch, Schema<StockProtoMessage.Stock> schema, CompressionType compressionType) throws Exception {
        Producer producer = pulsarClient.newProducer(schema).topic(topicName.toString()).enableBatching(isBatch).compressionType(compressionType).create();
        try {
            for (int i = 0; i < 10; ++i) {
                StockProtoMessage.Stock stock = StockProtoMessage.Stock.newBuilder().setEntryId(i).setSymbol("STOCK_" + i).setSharePrice(100.0 + (double)(i * 10)).build();
                producer.send((Object)stock);
            }
            producer.flush();
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void prepareDataForKeyValueSchema(PulsarClient pulsarClient, TopicName topicName, Schema<KeyValue<Stock, Stock>> schema, CompressionType compressionType) throws Exception {
        Producer producer = pulsarClient.newProducer(schema).topic(topicName.toString()).compressionType(compressionType).create();
        try {
            for (int i = 0; i < 10; ++i) {
                int j = 100 * i;
                Stock stock1 = new Stock(j, "STOCK_" + j, 100.0 + (double)(j * 10));
                Stock stock2 = new Stock(i, "STOCK_" + i, 100.0 + (double)(i * 10));
                producer.send((Object)new KeyValue((Object)stock1, (Object)stock2));
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    @Override
    protected void validateContent(int messageNum, String[] contentArr, Schema schema) {
        switch (schema.getSchemaInfo().getType()) {
            case BYTES: {
                log.info("Skip validate content for BYTES schema type.");
                break;
            }
            case STRING: {
                this.validateContentForStringSchema(messageNum, contentArr);
                log.info("finish validate content for STRING schema type.");
                break;
            }
            case JSON: 
            case AVRO: 
            case PROTOBUF_NATIVE: {
                this.validateContentForStructSchema(messageNum, contentArr);
                log.info("finish validate content for {} schema type.", (Object)schema.getSchemaInfo().getType());
                break;
            }
            case KEY_VALUE: {
                this.validateContentForKeyValueSchema(messageNum, contentArr);
                log.info("finish validate content for KEY_VALUE {} schema type.", (Object)((KeyValueSchemaImpl)schema).getKeyValueEncodingType());
            }
        }
    }

    private void validateContentForStringSchema(int messageNum, String[] contentArr) {
        for (int i = 0; i < messageNum; ++i) {
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"string" + i + "\""});
        }
    }

    private void validateContentForStructSchema(int messageNum, String[] contentArr) {
        for (int i = 0; i < messageNum; ++i) {
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + i + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"STOCK_" + i + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + (100.0 + (double)(i * 10)) + "\""});
        }
    }

    private void validateContentForKeyValueSchema(int messageNum, String[] contentArr) {
        for (int i = 0; i < messageNum; ++i) {
            int j = 100 * i;
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + i + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"STOCK_" + i + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + (100.0 + (double)(i * 10)) + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + j + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"STOCK_" + j + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + (100.0 + (double)(j * 10)) + "\""});
        }
    }
}

