/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.offload;

import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public abstract class TestBaseOffload
extends PulsarTieredStorageTestSuite {
    private static final Logger log = LoggerFactory.getLogger(TestBaseOffload.class);

    protected int getEntrySize() {
        return 1024;
    }

    private byte[] buildEntry(String pattern) {
        byte[] entry = new byte[this.getEntrySize()];
        byte[] patternBytes = pattern.getBytes();
        for (int i = 0; i < entry.length; ++i) {
            entry[i] = patternBytes[i % patternBytes.length];
        }
        return entry;
    }

    protected void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
        String tenant = "offload-test-cli-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        long firstLedger = -1L;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Producer producer = client.newProducer().topic(topic).maxPendingMessages(this.getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS).blockIfQueueFull(true).enableBatching(false).create();){
            client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe().close();
            int i = 0;
            AtomicBoolean success = new AtomicBoolean(true);
            while ((double)i < (double)this.getNumEntriesPerLedger() * 1.5) {
                producer.sendAsync((Object)this.buildEntry("offload-message" + i)).exceptionally(e -> {
                    log.error("failed to send a message", e);
                    success.set(false);
                    return null;
                });
                ++i;
            }
            producer.flush();
            Assert.assertTrue((boolean)success.get());
        }
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();){
            firstLedger = ((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).ledgerId;
            String output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload", "--size-threshold", "100G", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Nothing to offload"));
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload-status", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload has not been run"));
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload", "--size-threshold", "1M", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload triggered"));
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload-status", "-w", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload was a success"));
            ClientConfiguration bkConf = new ClientConfiguration();
            bkConf.setZkServers(this.pulsarCluster.getZKConnString());
            try (BookKeeper bk = new BookKeeper(bkConf);){
                bk.deleteLedger(firstLedger);
            }
            admin.topics().unload(topic);
        }
        log.info("Read back the data (which would be in that first ledger)");
        client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try (Consumer consumer = client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe();){
            int i = 0;
            while ((double)i < (double)this.getNumEntriesPerLedger() * 1.5) {
                Message m = consumer.receive(1, TimeUnit.MINUTES);
                Assert.assertEquals((byte[])this.buildEntry("offload-message" + i), (byte[])m.getData());
                ++i;
            }
        }
        finally {
            if (client != null) {
                client.close();
            }
        }
    }

    protected void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
        String tenant = "offload-test-threshold-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "1M", namespace);
        long firstLedger = 0L;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();
             Producer producer = client.newProducer().topic(topic).maxPendingMessages(this.getNumEntriesPerLedger() / 2).sendTimeout(60, TimeUnit.SECONDS).blockIfQueueFull(true).enableBatching(false).create();){
            client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe().close();
            AtomicBoolean success = new AtomicBoolean(true);
            int i = 0;
            while ((double)i < (double)this.getNumEntriesPerLedger() * 2.5) {
                producer.sendAsync((Object)this.buildEntry("offload-message" + i)).exceptionally(e -> {
                    log.error("failed to send a message", e);
                    success.set(false);
                    return null;
                });
                ++i;
            }
            producer.flush();
            Assert.assertTrue((boolean)success.get());
        }
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();){
            firstLedger = ((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).ledgerId;
            for (int i = 0; i < 100 && !((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).offloaded; ++i) {
                Thread.sleep(300L);
            }
            Assert.assertTrue((boolean)((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).offloaded);
            ClientConfiguration bkConf = new ClientConfiguration();
            bkConf.setZkServers(this.pulsarCluster.getZKConnString());
            try (BookKeeper bk = new BookKeeper(bkConf);){
                bk.deleteLedger(firstLedger);
            }
            admin.topics().unload(topic);
        }
        log.info("Read back the data (which would be in that first ledger)");
        client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        try (Consumer consumer = client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe();){
            int i = 0;
            while ((double)i < (double)this.getNumEntriesPerLedger() * 2.5) {
                Message m = consumer.receive(1, TimeUnit.MINUTES);
                Assert.assertNotNull((Object)m);
                Assert.assertEquals((byte[])this.buildEntry("offload-message" + i), (byte[])m.getData());
                ++i;
            }
        }
        finally {
            if (client != null) {
                client.close();
            }
        }
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "-1", namespace);
        admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();
        try {
            Assert.assertEquals((long)admin.namespaces().getOffloadThreshold(namespace), (long)-1L);
        }
        finally {
            if (admin != null) {
                admin.close();
            }
        }
    }

    private boolean ledgerOffloaded(List<ManagedLedgerInternalStats.LedgerInfo> ledgers, long ledgerId) {
        return ledgers.stream().filter(l -> l.ledgerId == ledgerId).map(l -> l.offloaded).findFirst().get();
    }

    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception {
        return this.writeAndWaitForOffload(serviceUrl, adminUrl, topic, -1);
    }

    /*
     * Exception decompiling
     */
    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic, int partitionNum) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 2 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
        ClientConfiguration bkConf = new ClientConfiguration();
        bkConf.setZkServers(this.pulsarCluster.getZKConnString());
        try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf);){
            bk.openLedger(ledgerId).close();
            boolean bl = true;
            return bl;
        }
    }

    protected void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
        String tenant = "offload-test-deletion-lag-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "0", namespace);
        String output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("Unset for namespace"));
        long offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, "--lag", "0m");
        output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("0 minute(s)"));
        offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
        for (int i = 0; i < 10 && this.ledgerExistsInBookKeeper(offloadedLedger); ++i) {
            this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
            Thread.sleep(1000L);
        }
        Assert.assertFalse((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace);
        Thread.sleep(5L);
        output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("Unset for namespace"));
        offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
    }

    protected void testDeleteOffloadedTopic(String serviceUrl, String adminUrl, boolean unloadBeforeDelete, int numPartitions) throws Exception {
        String tenant = "offload-test-cli-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "0", namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-retention", "--size", "100M", "--time", "100m", namespace);
        String output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("Unset for namespace"));
        if (numPartitions > 0) {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "create-partitioned-topic", topic, "--partitions", Integer.toString(numPartitions));
        } else {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", topic);
        }
        long offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1);
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, "--lag", "0m");
        output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("0 minute(s)"));
        offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1);
        for (int i = 0; i < 10 && this.ledgerExistsInBookKeeper(offloadedLedger); ++i) {
            this.writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1);
            Thread.sleep(1000L);
        }
        Assert.assertFalse((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
        Assert.assertTrue((boolean)this.offloadedLedgerExists(topic, numPartitions - 1, offloadedLedger));
        if (unloadBeforeDelete) {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", topic);
        }
        if (numPartitions > 0) {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete-partitioned-topic", topic);
        } else {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", topic);
        }
        long ledgerId = offloadedLedger;
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertFalse((boolean)this.offloadedLedgerExists(topic, numPartitions - 1, ledgerId)));
    }

    protected void testDeleteOffloadedTopicExistsInBk(String serviceUrl, String adminUrl, boolean unloadBeforeDelete, int numPartitions) throws Exception {
        String tenant = "offload-test-cli-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "0", namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-retention", "--size", "100M", "--time", "100m", namespace);
        if (numPartitions > 0) {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "create-partitioned-topic", topic, "--partitions", Integer.toString(numPartitions));
        } else {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "create", topic);
        }
        String output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("Unset for namespace"));
        long offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic, numPartitions - 1);
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
        Assert.assertTrue((boolean)this.offloadedLedgerExists(topic, numPartitions - 1, offloadedLedger));
        if (unloadBeforeDelete) {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "unload", topic);
        }
        if (numPartitions > 0) {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete-partitioned-topic", topic);
        } else {
            this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "delete", topic);
        }
        long ledgerId = offloadedLedger;
        Awaitility.await().atMost(30L, TimeUnit.SECONDS).untilAsserted(() -> Assert.assertFalse((boolean)this.offloadedLedgerExists(topic, numPartitions - 1, ledgerId)));
        Assert.assertFalse((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
    }

    protected boolean offloadedLedgerExists(String topic, int partitionNum, long firstLedger) {
        throw new RuntimeException("not implemented");
    }

    private static /* synthetic */ MessageId lambda$writeAndWaitForOffload$4(AtomicBoolean success, Throwable e) {
        log.error("failed to send a message", e);
        success.set(false);
        return null;
    }
}

