/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.presto;

import com.google.common.base.Stopwatch;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarSQLTestSuite;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.shaded.okhttp3.OkHttpClient;
import org.testcontainers.shaded.okhttp3.Request;
import org.testcontainers.shaded.okhttp3.Response;
import org.testng.Assert;
import org.testng.annotations.DataProvider;

public class TestPulsarSQLBase
extends PulsarSQLTestSuite {
    private static final Logger log = LoggerFactory.getLogger(TestPulsarSQLBase.class);

    protected void pulsarSQLBasicTest(TopicName topic, boolean isBatch, boolean useNsOffloadPolices, Schema schema, CompressionType compressionType) throws Exception {
        log.info("Pulsar SQL basic test. topic: {}", (Object)topic);
        this.waitPulsarSQLReady();
        log.info("start prepare data for query. topic: {}", (Object)topic);
        int messageCnt = this.prepareData(topic, isBatch, useNsOffloadPolices, schema, compressionType);
        log.info("finish prepare data for query. topic: {}, messageCnt: {}", (Object)topic, (Object)messageCnt);
        this.validateMetadata(topic);
        this.validateData(topic, messageCnt, schema);
        log.info("Finish Pulsar SQL basic test. topic: {}", (Object)topic);
    }

    @DataProvider(name="batchingAndCompression")
    public static Object[][] batchingAndCompression() {
        return new Object[][]{{true, CompressionType.ZLIB}, {true, CompressionType.ZSTD}, {true, CompressionType.SNAPPY}, {true, CompressionType.LZ4}, {true, CompressionType.NONE}, {false, CompressionType.ZLIB}, {false, CompressionType.ZSTD}, {false, CompressionType.SNAPPY}, {false, CompressionType.LZ4}, {false, CompressionType.NONE}};
    }

    public void waitPulsarSQLReady() throws Exception {
        while (true) {
            try {
                ContainerExecResult result = this.execQuery("show catalogs;");
                Assertions.assertThat((int)result.getExitCode()).isEqualTo(0);
                Assertions.assertThat((String)result.getStdout()).contains(new CharSequence[]{"pulsar", "system"});
            }
            catch (ContainerExecException cee) {
                if (cee.getResult().getStderr().contains("Presto server is still initializing")) {
                    Thread.sleep(10000L);
                    continue;
                }
                throw cee;
            }
            break;
        }
        if (this.pulsarCluster.getSqlFollowWorkerContainers() != null && this.pulsarCluster.getSqlFollowWorkerContainers().size() > 0) {
            OkHttpClient okHttpClient = new OkHttpClient();
            Request request = new Request.Builder().url("http://" + this.pulsarCluster.getPrestoWorkerContainer().getUrl() + "/v1/node").build();
            while (true) {
                Response response = okHttpClient.newCall(request).execute();
                try {
                    Assert.assertNotNull((Object)response.body());
                    String nodeJsonStr = response.body().string();
                    Assert.assertTrue((nodeJsonStr.length() > 0 ? 1 : 0) != 0);
                    log.info("presto node info: {}", (Object)nodeJsonStr);
                    if (nodeJsonStr.contains("uri")) {
                        log.info("presto node exist.");
                        break;
                    }
                    Thread.sleep(1000L);
                    continue;
                }
                finally {
                    if (response == null) continue;
                    response.close();
                    continue;
                }
                break;
            }
        }
    }

    protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices, Schema schema, CompressionType compressionType) throws Exception {
        throw new Exception("Unsupported operation prepareData.");
    }

    public void validateMetadata(TopicName topicName) throws Exception {
        ContainerExecResult result = this.execQuery("show schemas in pulsar;");
        Assertions.assertThat((int)result.getExitCode()).isEqualTo(0);
        Assertions.assertThat((String)result.getStdout()).contains(new CharSequence[]{topicName.getNamespace()});
        this.pulsarCluster.getBroker(0).execCmd("/bin/bash", "-c", "bin/pulsar-admin namespaces unload " + topicName.getNamespace());
        Awaitility.await().untilAsserted(() -> {
            ContainerExecResult r = this.execQuery(String.format("show tables in pulsar.\"%s\";", topicName.getNamespace()));
            Assertions.assertThat((int)r.getExitCode()).isEqualTo(0);
            Assertions.assertThat((String)r.getStdout()).containsIgnoringCase((CharSequence)topicName.getLocalName());
        });
    }

    protected void validateContent(int messageNum, String[] contentArr, Schema schema) throws Exception {
        throw new Exception("Unsupported operation validateContent.");
    }

    private void validateData(TopicName topicName, int messageNum, Schema schema) throws Exception {
        String namespace = topicName.getNamespace();
        String topic = topicName.getLocalName();
        String queryAllDataSql = schema.getSchemaInfo().getType().isStruct() || schema.getSchemaInfo().getType().equals((Object)SchemaType.KEY_VALUE) ? String.format("select * from pulsar.\"%s\".\"%s\" order by entryid;", namespace, topic) : String.format("select * from pulsar.\"%s\".\"%s\";", namespace, topic);
        Awaitility.await().pollDelay(Duration.ofMillis(0L)).pollInterval(Duration.ofSeconds(3L)).atMost(Duration.ofSeconds(15L)).untilAsserted(() -> {
            ContainerExecResult containerExecResult = this.execQuery(queryAllDataSql);
            Assertions.assertThat((int)containerExecResult.getExitCode()).isEqualTo(0);
            log.info("select sql query output \n{}", (Object)containerExecResult.getStdout());
            String[] split = containerExecResult.getStdout().split("\n");
            Assertions.assertThat((int)split.length).isEqualTo(messageNum);
            String[] contentArr = containerExecResult.getStdout().split("\n|,");
            this.validateContent(messageNum, contentArr, schema);
        });
        String url = String.format("jdbc:presto://%s", this.pulsarCluster.getPrestoWorkerContainer().getUrl());
        Connection connection = DriverManager.getConnection(url, "test", null);
        String query = String.format("select * from pulsar.\"%s\".\"%s\" order by __publish_time__", namespace, topic);
        log.info("Executing query: {}", (Object)query);
        ResultSet res = connection.createStatement().executeQuery(query);
        LinkedList<Timestamp> timestamps = new LinkedList<Timestamp>();
        while (res.next()) {
            TestPulsarSQLBase.printCurrent(res);
            timestamps.add(res.getTimestamp("__publish_time__"));
        }
        log.info("Executing query: result for topic {} timestamps size {}", (Object)topic, (Object)timestamps.size());
        Assertions.assertThat((int)timestamps.size()).isGreaterThan(messageNum - 2);
        query = String.format("select * from pulsar.\"%s\".\"%s\" where __publish_time__ > timestamp '%s' order by __publish_time__", namespace, topic, timestamps.get(timestamps.size() / 2));
        log.info("Executing query: {}", (Object)query);
        res = connection.createStatement().executeQuery(query);
        LinkedList<Timestamp> returnedTimestamps = new LinkedList<Timestamp>();
        while (res.next()) {
            TestPulsarSQLBase.printCurrent(res);
            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
        }
        log.info("Executing query: result for topic {} returnedTimestamps size: {}", (Object)topic, (Object)returnedTimestamps.size());
        if (timestamps.size() % 2 == 0) {
            Assertions.assertThat((int)(returnedTimestamps.size() + 1)).isEqualTo(timestamps.size() / 2);
        } else {
            Assertions.assertThat((int)returnedTimestamps.size()).isEqualTo((timestamps.size() - 1) / 2);
        }
        query = String.format("select * from pulsar.\"%s\".\"%s\" where __publish_time__ > from_unixtime(%s) order by __publish_time__", namespace, topic, 0);
        log.info("Executing query: {}", (Object)query);
        res = connection.createStatement().executeQuery(query);
        returnedTimestamps = new LinkedList();
        while (res.next()) {
            TestPulsarSQLBase.printCurrent(res);
            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
        }
        log.info("Executing query: result for topic {} returnedTimestamps size: {}", (Object)topic, (Object)returnedTimestamps.size());
        Assertions.assertThat((int)returnedTimestamps.size()).isEqualTo(timestamps.size());
        query = String.format("select * from pulsar.\"%s\".\"%s\" where __publish_time__ > from_unixtime(%s) order by __publish_time__", namespace, topic, 99999999999L);
        log.info("Executing query: {}", (Object)query);
        res = connection.createStatement().executeQuery(query);
        returnedTimestamps = new LinkedList();
        while (res.next()) {
            TestPulsarSQLBase.printCurrent(res);
            returnedTimestamps.add(res.getTimestamp("__publish_time__"));
        }
        log.info("Executing query: result for topic {} returnedTimestamps size: {}", (Object)topic, (Object)returnedTimestamps.size());
        Assertions.assertThat((int)returnedTimestamps.size()).isEqualTo(0);
        query = String.format("select count(*) from pulsar.\"%s\".\"%s\"", namespace, topic);
        log.info("Executing query: {}", (Object)query);
        res = connection.createStatement().executeQuery(query);
        res.next();
        int count = res.getInt("_col0");
        Assertions.assertThat((int)count).isGreaterThan(messageNum - 2);
    }

    public ContainerExecResult execQuery(String query) throws Exception {
        ContainerExecResult containerExecResult = this.pulsarCluster.getPrestoWorkerContainer().execCmd("/bin/bash", "-c", "/pulsar/bin/pulsar sql --execute '" + query + "'");
        Stopwatch sw = Stopwatch.createStarted();
        while (containerExecResult.getExitCode() != 0 && sw.elapsed(TimeUnit.SECONDS) < 120L) {
            TimeUnit.MILLISECONDS.sleep(500L);
            containerExecResult = this.pulsarCluster.getPrestoWorkerContainer().execCmd("/bin/bash", "-c", "/pulsar/bin/pulsar sql --execute '" + query + "'");
        }
        return containerExecResult;
    }

    private static void printCurrent(ResultSet rs) throws SQLException {
        ResultSetMetaData rsmd = rs.getMetaData();
        int columnsNumber = rsmd.getColumnCount();
        for (int i = 1; i <= columnsNumber; ++i) {
            if (i > 1) {
                System.out.print(",  ");
            }
            String columnValue = rs.getString(i);
            System.out.print(columnValue + " " + rsmd.getColumnName(i));
        }
        System.out.println("");
    }
}

