/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.presto;

import com.google.common.base.Preconditions;
import java.util.Collections;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.tests.integration.containers.S3Container;
import org.apache.pulsar.tests.integration.presto.Stock;
import org.apache.pulsar.tests.integration.presto.TestPulsarSQLBase;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestPrestoQueryTieredStorage
extends TestPulsarSQLBase {
    private static final Logger log = LoggerFactory.getLogger(TestPrestoQueryTieredStorage.class);
    private final String TENANT = "presto";
    private final String NAMESPACE = "ts";
    private S3Container s3Container;

    @Override
    public void setupCluster() throws Exception {
        super.setupCluster();
        this.setupExtraContainers();
    }

    @Override
    public void tearDownCluster() throws Exception {
        this.teardownPresto();
        super.tearDownCluster();
    }

    private void setupExtraContainers() throws Exception {
        log.info("[TestPrestoQueryTieredStorage] setupExtraContainers...");
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", "presto");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), NamespaceName.get((String)"presto", (String)"ts").toString());
        this.s3Container = (S3Container)((S3Container)new S3Container(this.pulsarCluster.getClusterName(), "s3").withNetwork(this.pulsarCluster.getNetwork())).withNetworkAliases(new String[]{"s3"});
        this.s3Container.start();
        String offloadProperties = this.getOffloadProperties("pulsar-integtest", null, "http://s3:9090");
        this.pulsarCluster.startPrestoWorker("aws-s3", offloadProperties);
        this.pulsarCluster.startPrestoFollowWorkers(1, "aws-s3", offloadProperties);
        this.initJdbcConnection();
    }

    private String getOffloadProperties(String bucket, String region, String endpoint) {
        Preconditions.checkNotNull((Object)bucket);
        StringBuilder sb = new StringBuilder();
        sb.append("{");
        sb.append("\"s3ManagedLedgerOffloadBucket\":").append("\"").append(bucket).append("\",");
        if (StringUtils.isNotEmpty((CharSequence)region)) {
            sb.append("\"s3ManagedLedgerOffloadRegion\":").append("\"").append(region).append("\",");
        }
        if (StringUtils.isNotEmpty((CharSequence)endpoint)) {
            sb.append("\"s3ManagedLedgerOffloadServiceEndpoint\":").append("\"").append(endpoint).append("\"");
        }
        sb.append("}");
        return sb.toString();
    }

    public void teardownPresto() {
        log.info("[TestPrestoQueryTieredStorage] tearing down...");
        if (null != this.s3Container) {
            this.s3Container.stop();
        }
        this.pulsarCluster.stopPrestoWorker();
    }

    @Test
    public void testQueryTieredStorage1() throws Exception {
        TopicName topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)"presto", (String)"ts", (String)("stocks_ts_nons_" + TestPrestoQueryTieredStorage.randomName(5)));
        this.pulsarSQLBasicTest(topicName, false, false, (Schema<?>)JSONSchema.of(Stock.class), CompressionType.NONE);
    }

    @Test
    public void testQueryTieredStorage2() throws Exception {
        TopicName topicName = TopicName.get((String)TopicDomain.persistent.value(), (String)"presto", (String)"ts", (String)("stocks_ts_ns_" + TestPrestoQueryTieredStorage.randomName(5)));
        this.pulsarSQLBasicTest(topicName, false, true, (Schema<?>)JSONSchema.of(Stock.class), CompressionType.NONE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    protected int prepareData(TopicName topicName, boolean isBatch, boolean useNsOffloadPolices, Schema<?> schema, CompressionType compressionType) throws Exception {
        Consumer consumer = this.pulsarClient.newConsumer((Schema)JSONSchema.of(Stock.class)).topic(new String[]{topicName.toString()}).subscriptionName("test").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
        try {
            int n;
            block10: {
                Producer producer = this.pulsarClient.newProducer((Schema)JSONSchema.of(Stock.class)).topic(topicName.toString()).compressionType(compressionType).create();
                try {
                    MessageIdImpl messageId;
                    long firstLedgerId = -1L;
                    int sendMessageCnt = 0;
                    while (true) {
                        Stock stock = new Stock(sendMessageCnt, "STOCK_" + sendMessageCnt, 100.0 + (double)(sendMessageCnt * 10));
                        messageId = (MessageIdImpl)producer.send((Object)stock);
                        ++sendMessageCnt;
                        if (firstLedgerId == -1L) {
                            firstLedgerId = messageId.getLedgerId();
                        }
                        if (messageId.getLedgerId() > firstLedgerId) break;
                        Thread.sleep(100L);
                    }
                    log.info("ledger rollover firstLedgerId: {}, currentLedgerId: {}", (Object)firstLedgerId, (Object)messageId.getLedgerId());
                    this.offloadAndDeleteFromBK(useNsOffloadPolices, topicName);
                    n = sendMessageCnt;
                    if (Collections.singletonList(producer).get(0) == null) break block10;
                }
                catch (Throwable throwable) {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                    throw throwable;
                }
                producer.close();
            }
            return n;
        }
        finally {
            if (Collections.singletonList(consumer).get(0) != null) {
                consumer.close();
            }
        }
    }

    private void offloadAndDeleteFromBK(boolean useNsOffloadPolices, TopicName topicName) {
        String adminUrl = this.pulsarCluster.getHttpServiceUrl();
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();){
            long firstLedger = ((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topicName.toString()).ledgers.get((int)0)).ledgerId;
            String output = "";
            if (useNsOffloadPolices) {
                this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-policies", "--bucket", "pulsar-integtest", "--driver", "aws-s3", "--endpoint", "http://s3:9090", "--offloadAfterElapsed", "1000", topicName.getNamespace());
                output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-policies", topicName.getNamespace()).getStdout();
                Assert.assertTrue((boolean)output.contains("pulsar-integtest"));
                Assert.assertTrue((boolean)output.contains("aws-s3"));
            }
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload", "--size-threshold", "0", topicName.toString()).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload triggered"));
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload-status", "-w", topicName.toString()).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload was a success"));
            ClientConfiguration bkConf = new ClientConfiguration();
            bkConf.setZkServers(this.pulsarCluster.getZKConnString());
            try (BookKeeper bk = new BookKeeper(bkConf);){
                bk.deleteLedger(firstLedger);
            }
            catch (Exception e) {
                log.error("Failed to delete from BookKeeper.", (Throwable)e);
                Assert.fail((String)"Failed to delete from BookKeeper.");
            }
            admin.topics().unload(topicName.toString());
        }
        catch (Exception e) {
            Assert.fail((String)"Failed to deleteOffloadedDataFromBK.");
        }
    }

    @Override
    protected void validateContent(int messageNum, String[] contentArr, Schema<?> schema) {
        for (int i = 0; i < messageNum; ++i) {
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + i + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"STOCK_" + i + "\""});
            Assertions.assertThat((Object[])contentArr).contains((Object[])new String[]{"\"" + (100.0 + (double)(i * 10)) + "\""});
        }
    }
}

