/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.offload;

import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
import org.apache.pulsar.tests.integration.suites.PulsarTieredStorageTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

public abstract class TestBaseOffload
extends PulsarTieredStorageTestSuite {
    private static final Logger log = LoggerFactory.getLogger(TestBaseOffload.class);
    private static final int ENTRY_SIZE = 1024;

    private static byte[] buildEntry(String pattern) {
        byte[] entry = new byte[1024];
        byte[] patternBytes = pattern.getBytes();
        for (int i = 0; i < entry.length; ++i) {
            entry[i] = patternBytes[i % patternBytes.length];
        }
        return entry;
    }

    public void testPublishOffloadAndConsumeViaCLI(String serviceUrl, String adminUrl) throws Exception {
        Throwable throwable;
        String tenant = "offload-test-cli-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        long firstLedger = -1L;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();){
            throwable = null;
            try (Producer producer = client.newProducer().topic(topic).blockIfQueueFull(true).enableBatching(false).create();){
                client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe().close();
                int i = 0;
                while ((double)i < 1536.0) {
                    producer.sendAsync((Object)TestBaseOffload.buildEntry("offload-message" + i));
                    ++i;
                }
                producer.flush();
            }
            catch (Throwable i) {
                throwable = i;
                throw i;
            }
        }
        var9_8 = null;
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();){
            firstLedger = ((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).ledgerId;
            String output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload", "--size-threshold", "100G", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Nothing to offload"));
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload-status", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload has not been run"));
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload", "--size-threshold", "1M", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload triggered"));
            output = this.pulsarCluster.runAdminCommandOnAnyBroker("topics", "offload-status", "-w", topic).getStdout();
            Assert.assertTrue((boolean)output.contains("Offload was a success"));
            ClientConfiguration bkConf = new ClientConfiguration();
            bkConf.setZkServers(this.pulsarCluster.getZKConnString());
            try (BookKeeper bk = new BookKeeper(bkConf);){
                bk.deleteLedger(firstLedger);
            }
            admin.topics().unload(topic);
        }
        catch (Throwable output) {
            var9_8 = output;
            throw output;
        }
        log.info("Read back the data (which would be in that first ledger)");
        client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        var9_8 = null;
        try {
            throwable = null;
            try (Consumer consumer = client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe();){
                int i = 0;
                while ((double)i < 1536.0) {
                    Message m = consumer.receive(1, TimeUnit.MINUTES);
                    Assert.assertEquals((byte[])TestBaseOffload.buildEntry("offload-message" + i), (byte[])m.getData());
                    ++i;
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
        }
        catch (Throwable throwable3) {
            var9_8 = throwable3;
            throw throwable3;
        }
        finally {
            if (client != null) {
                if (var9_8 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable4) {
                        var9_8.addSuppressed(throwable4);
                    }
                } else {
                    client.close();
                }
            }
        }
    }

    public void testPublishOffloadAndConsumeViaThreshold(String serviceUrl, String adminUrl) throws Exception {
        Throwable throwable;
        String tenant = "offload-test-threshold-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "1M", namespace);
        long firstLedger = 0L;
        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl).build();){
            throwable = null;
            try (Producer producer = client.newProducer().topic(topic).blockIfQueueFull(true).enableBatching(false).create();){
                client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe().close();
                int i = 0;
                while ((double)i < 2560.0) {
                    producer.sendAsync((Object)TestBaseOffload.buildEntry("offload-message" + i));
                    ++i;
                }
                producer.flush();
            }
            catch (Throwable i) {
                throwable = i;
                throw i;
            }
        }
        var9_8 = null;
        try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();){
            firstLedger = ((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).ledgerId;
            for (int i = 0; i < 300 && !((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).offloaded; ++i) {
                Thread.sleep(100L);
            }
            Assert.assertTrue((boolean)((ManagedLedgerInternalStats.LedgerInfo)admin.topics().getInternalStats((String)topic).ledgers.get((int)0)).offloaded);
            ClientConfiguration bkConf = new ClientConfiguration();
            bkConf.setZkServers(this.pulsarCluster.getZKConnString());
            try (BookKeeper bk = new BookKeeper(bkConf);){
                bk.deleteLedger(firstLedger);
            }
            admin.topics().unload(topic);
        }
        catch (Throwable bkConf) {
            var9_8 = bkConf;
            throw bkConf;
        }
        log.info("Read back the data (which would be in that first ledger)");
        client = PulsarClient.builder().serviceUrl(serviceUrl).build();
        var9_8 = null;
        try {
            throwable = null;
            try (Consumer consumer = client.newConsumer().topic(new String[]{topic}).subscriptionName("my-sub").subscribe();){
                int i = 0;
                while ((double)i < 2560.0) {
                    Message m = consumer.receive(1, TimeUnit.MINUTES);
                    Assert.assertEquals((byte[])TestBaseOffload.buildEntry("offload-message" + i), (byte[])m.getData());
                    ++i;
                }
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
        }
        catch (Throwable throwable3) {
            var9_8 = throwable3;
            throw throwable3;
        }
        finally {
            if (client != null) {
                if (var9_8 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable4) {
                        var9_8.addSuppressed(throwable4);
                    }
                } else {
                    client.close();
                }
            }
        }
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "-1", namespace);
        admin = PulsarAdmin.builder().serviceHttpUrl(adminUrl).build();
        var9_8 = null;
        try {
            Assert.assertEquals((long)admin.namespaces().getOffloadThreshold(namespace), (long)-1L);
        }
        catch (Throwable throwable5) {
            var9_8 = throwable5;
            throw throwable5;
        }
        finally {
            if (admin != null) {
                if (var9_8 != null) {
                    try {
                        admin.close();
                    }
                    catch (Throwable throwable6) {
                        var9_8.addSuppressed(throwable6);
                    }
                } else {
                    admin.close();
                }
            }
        }
    }

    private boolean ledgerOffloaded(List<ManagedLedgerInternalStats.LedgerInfo> ledgers, long ledgerId) {
        return ledgers.stream().filter(l -> l.ledgerId == ledgerId).map(l -> l.offloaded).findFirst().get();
    }

    /*
     * Exception decompiling
     */
    private long writeAndWaitForOffload(String serviceUrl, String adminUrl, String topic) throws Exception {
        /*
         * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
         * 
         * org.benf.cfr.reader.util.ConfusedCFRException: Started 4 blocks at once
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.getStartingBlocks(Op04StructuredStatement.java:412)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op04StructuredStatement.buildNestedBlocks(Op04StructuredStatement.java:487)
         *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.createInitialStructuredBlock(Op03SimpleStatement.java:736)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:850)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
         *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
         *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
         *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
         *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
         *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
         *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
         *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
         *     at org.benf.cfr.reader.Main.main(Main.java:54)
         */
        throw new IllegalStateException("Decompilation failed");
    }

    public boolean ledgerExistsInBookKeeper(long ledgerId) throws Exception {
        ClientConfiguration bkConf = new ClientConfiguration();
        bkConf.setZkServers(this.pulsarCluster.getZKConnString());
        Throwable throwable = null;
        try (BookKeeperAdmin bk = new BookKeeperAdmin(bkConf);){
            bk.openLedger(ledgerId).close();
            boolean bl = true;
            return bl;
        }
        catch (BKException.BKNoSuchLedgerExistsException | BKException.BKNoSuchLedgerExistsOnMetadataServerException e) {
            boolean bl = false;
            return bl;
        }
        catch (Throwable throwable2) {
            throwable = throwable2;
            throw throwable2;
        }
    }

    public void testPublishOffloadAndConsumeDeletionLag(String serviceUrl, String adminUrl) throws Exception {
        String tenant = "offload-test-deletion-lag-" + TestBaseOffload.randomName(4);
        String namespace = tenant + "/ns1";
        String topic = "persistent://" + namespace + "/topic1";
        this.pulsarCluster.runAdminCommandOnAnyBroker("tenants", "create", "--allowed-clusters", this.pulsarCluster.getClusterName(), "--admin-roles", "offload-admin", tenant);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "--clusters", this.pulsarCluster.getClusterName(), namespace);
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-threshold", "--size", "0", namespace);
        String output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("Unset for namespace"));
        long offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-offload-deletion-lag", namespace, "--lag", "0m");
        output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("0 minute(s)"));
        offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
        for (int i = 0; i < 10 && this.ledgerExistsInBookKeeper(offloadedLedger); ++i) {
            this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
            Thread.sleep(1000L);
        }
        Assert.assertFalse((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "clear-offload-deletion-lag", namespace);
        Thread.sleep(5L);
        output = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-offload-deletion-lag", namespace).getStdout();
        Assert.assertTrue((boolean)output.contains("Unset for namespace"));
        offloadedLedger = this.writeAndWaitForOffload(serviceUrl, adminUrl, topic);
        Thread.sleep(5000L);
        Assert.assertTrue((boolean)this.ledgerExistsInBookKeeper(offloadedLedger));
    }
}

