/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog.bk;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.bk.ImmutableQuorumConfigProvider;
import org.apache.distributedlog.bk.LedgerMetadata;
import org.apache.distributedlog.bk.QuorumConfigProvider;
import org.apache.distributedlog.bk.SimpleLedgerAllocator;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.DefaultZKOp;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerEntry;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.LongVersion;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.Op;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestLedgerAllocator
extends TestDistributedLogBase {
    private static final Logger logger = LoggerFactory.getLogger(TestLedgerAllocator.class);
    private static final String ledgersPath = "/ledgers";
    private static final Transaction.OpListener<LedgerHandle> NULL_LISTENER = new Transaction.OpListener<LedgerHandle>(){

        @Override
        public void onCommit(LedgerHandle r) {
        }

        @Override
        public void onAbort(Throwable t) {
        }
    };
    @Rule
    public TestName runtime = new TestName();
    private ZooKeeperClient zkc;
    private BookKeeperClient bkc;
    private DistributedLogConfiguration dlConf = new DistributedLogConfiguration();

    private URI createURI(String path) {
        return URI.create("distributedlog://" + zkServers + path);
    }

    @Override
    @Before
    public void setup() throws Exception {
        this.zkc = TestZooKeeperClientBuilder.newBuilder().uri(this.createURI("/")).zkServers(zkServers).build();
        this.bkc = BookKeeperClientBuilder.newBuilder().name("bkc").dlConfig(this.dlConf).ledgersPath(ledgersPath).zkc(this.zkc).build();
    }

    @Override
    @After
    public void teardown() throws Exception {
        this.bkc.close();
        this.zkc.close();
    }

    private QuorumConfigProvider newQuorumConfigProvider(DistributedLogConfiguration conf) {
        return new ImmutableQuorumConfigProvider(conf.getQuorumConfig());
    }

    private ZKTransaction newTxn() {
        return new ZKTransaction(this.zkc);
    }

    private SimpleLedgerAllocator createAllocator(String allocationPath) throws Exception {
        return this.createAllocator(allocationPath, this.dlConf);
    }

    private SimpleLedgerAllocator createAllocator(String allocationPath, DistributedLogConfiguration conf) throws Exception {
        return this.createAllocator(allocationPath, conf, null);
    }

    private SimpleLedgerAllocator createAllocator(String allocationPath, DistributedLogConfiguration conf, LedgerMetadata ledgerMetadata) throws Exception {
        return Utils.ioResult(SimpleLedgerAllocator.of(allocationPath, null, this.newQuorumConfigProvider(conf), this.zkc, this.bkc, ledgerMetadata));
    }

    public void testAllocation() throws Exception {
        String allocationPath = "/allocation1";
        SimpleLedgerAllocator allocator = this.createAllocator(allocationPath);
        allocator.allocate();
        ZKTransaction txn = this.newTxn();
        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
        logger.info("Try obtaining ledger handle {}", (Object)lh.getId());
        byte[] data = this.zkc.get().getData(allocationPath, false, null);
        Assert.assertEquals((Object)lh.getId(), (Object)Long.valueOf(new String(data, StandardCharsets.UTF_8)));
        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(StandardCharsets.UTF_8), -1), null));
        try {
            Utils.ioResult(txn.execute());
            Assert.fail((String)"Should fail the transaction when setting unexisted path");
        }
        catch (ZKException ke) {
            logger.info("Should fail on executing transaction when setting unexisted path", (Throwable)ke);
        }
        data = this.zkc.get().getData(allocationPath, false, null);
        Assert.assertEquals((Object)lh.getId(), (Object)Long.valueOf(new String(data, StandardCharsets.UTF_8)));
        txn = this.newTxn();
        LedgerHandle newLh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
        Assert.assertEquals((long)lh.getId(), (long)newLh.getId());
        Utils.ioResult(txn.execute());
        data = this.zkc.get().getData(allocationPath, false, null);
        Assert.assertEquals((long)0L, (long)data.length);
        Utils.close(allocator);
    }

    @Test(timeout=60000L)
    public void testBadVersionOnTwoAllocators() throws Exception {
        String allocationPath = "/allocation-bad-version";
        this.zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Stat stat = new Stat();
        byte[] data = this.zkc.get().getData(allocationPath, false, stat);
        Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
        SimpleLedgerAllocator allocator1 = new SimpleLedgerAllocator(allocationPath, allocationData, this.newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        SimpleLedgerAllocator allocator2 = new SimpleLedgerAllocator(allocationPath, allocationData, this.newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        allocator1.allocate();
        ZKTransaction txn1 = this.newTxn();
        LedgerHandle lh = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
        allocator2.allocate();
        ZKTransaction txn2 = this.newTxn();
        try {
            Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
            Assert.fail((String)"Should fail allocating on second allocator as allocator1 is starting allocating something.");
        }
        catch (ZKException ke) {
            Assert.assertEquals((Object)KeeperException.Code.BADVERSION, (Object)ke.getKeeperExceptionCode());
        }
        Utils.ioResult(txn1.execute());
        Utils.close(allocator1);
        Utils.close(allocator2);
        long eid = lh.addEntry("hello world".getBytes());
        lh.close();
        LedgerHandle readLh = this.bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes());
        Enumeration<LedgerEntry> entries = readLh.readEntries(eid, eid);
        int i = 0;
        while (entries.hasMoreElements()) {
            LedgerEntry entry = entries.nextElement();
            Assert.assertEquals((Object)"hello world", (Object)new String(entry.getEntry(), StandardCharsets.UTF_8));
            ++i;
        }
        Assert.assertEquals((long)1L, (long)i);
    }

    @Test(timeout=60000L)
    public void testAllocatorWithoutEnoughBookies() throws Exception {
        String allocationPath = "/allocator-without-enough-bookies";
        DistributedLogConfiguration confLocal = new DistributedLogConfiguration();
        confLocal.addConfiguration(conf);
        confLocal.setEnsembleSize(numBookies * 2);
        confLocal.setWriteQuorumSize(numBookies * 2);
        SimpleLedgerAllocator allocator1 = this.createAllocator(allocationPath, confLocal);
        allocator1.allocate();
        ZKTransaction txn1 = this.newTxn();
        try {
            Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
            Assert.fail((String)"Should fail allocating ledger if there aren't enough bookies");
        }
        catch (SimpleLedgerAllocator.AllocationException ioe) {
            Assert.assertEquals((Object)((Object)SimpleLedgerAllocator.Phase.ERROR), (Object)((Object)ioe.getPhase()));
        }
        byte[] data = this.zkc.get().getData(allocationPath, false, null);
        Assert.assertEquals((long)0L, (long)data.length);
    }

    @Test(timeout=60000L)
    public void testSuccessAllocatorShouldDeleteUnusedledger() throws Exception {
        String allocationPath = "/allocation-delete-unused-ledger";
        this.zkc.get().create(allocationPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Stat stat = new Stat();
        byte[] data = this.zkc.get().getData(allocationPath, false, stat);
        Versioned<byte[]> allocationData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
        SimpleLedgerAllocator allocator1 = new SimpleLedgerAllocator(allocationPath, allocationData, this.newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        allocator1.allocate();
        ZKTransaction txn1 = this.newTxn();
        LedgerHandle lh1 = Utils.ioResult(allocator1.tryObtain(txn1, NULL_LISTENER));
        stat = new Stat();
        data = this.zkc.get().getData(allocationPath, false, stat);
        allocationData = new Versioned<byte[]>(data, new LongVersion(stat.getVersion()));
        SimpleLedgerAllocator allocator2 = new SimpleLedgerAllocator(allocationPath, allocationData, this.newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        allocator2.allocate();
        ZKTransaction txn2 = this.newTxn();
        LedgerHandle lh2 = Utils.ioResult(allocator2.tryObtain(txn2, NULL_LISTENER));
        try {
            Utils.ioResult(txn1.execute());
            Assert.fail((String)"Should fail commit obtaining ledger handle from first allocator as allocator is modified by second allocator.");
        }
        catch (ZKException zKException) {
            // empty catch block
        }
        Utils.ioResult(txn2.execute());
        Utils.close(allocator1);
        Utils.close(allocator2);
        try {
            lh1.close();
            Assert.fail((String)"LedgerHandle allocated by allocator1 should be deleted.");
        }
        catch (BKException bKException) {
            // empty catch block
        }
        try {
            this.bkc.get().openLedger(lh1.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes());
            Assert.fail((String)"LedgerHandle allocated by allocator1 should be deleted.");
        }
        catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException bKNoSuchLedgerExistsOnMetadataServerException) {
            // empty catch block
        }
        long eid = lh2.addEntry("hello world".getBytes());
        lh2.close();
        LedgerHandle readLh = this.bkc.get().openLedger(lh2.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes());
        Enumeration<LedgerEntry> entries = readLh.readEntries(eid, eid);
        int i = 0;
        while (entries.hasMoreElements()) {
            LedgerEntry entry = entries.nextElement();
            Assert.assertEquals((Object)"hello world", (Object)new String(entry.getEntry(), StandardCharsets.UTF_8));
            ++i;
        }
        Assert.assertEquals((long)1L, (long)i);
    }

    @Test(timeout=60000L)
    public void testCloseAllocatorDuringObtaining() throws Exception {
        String allocationPath = "/allocation2";
        SimpleLedgerAllocator allocator = this.createAllocator(allocationPath);
        allocator.allocate();
        ZKTransaction txn = this.newTxn();
        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
        Utils.close(allocator);
        byte[] data = this.zkc.get().getData(allocationPath, false, null);
        Assert.assertEquals((Object)lh.getId(), (Object)Long.valueOf(new String(data, StandardCharsets.UTF_8)));
        this.bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    public void testCloseAllocatorAfterConfirm() throws Exception {
        String allocationPath = "/allocation2";
        SimpleLedgerAllocator allocator = this.createAllocator(allocationPath);
        allocator.allocate();
        ZKTransaction txn = this.newTxn();
        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
        Utils.ioResult(txn.execute());
        Utils.close(allocator);
        byte[] data = this.zkc.get().getData(allocationPath, false, null);
        Assert.assertEquals((long)0L, (long)data.length);
        this.bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    @Test(timeout=60000L)
    public void testCloseAllocatorAfterAbort() throws Exception {
        String allocationPath = "/allocation3";
        SimpleLedgerAllocator allocator = this.createAllocator(allocationPath);
        allocator.allocate();
        ZKTransaction txn = this.newTxn();
        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
        txn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(StandardCharsets.UTF_8), -1), null));
        try {
            Utils.ioResult(txn.execute());
            Assert.fail((String)"Should fail the transaction when setting unexisted path");
        }
        catch (ZKException zKException) {
            // empty catch block
        }
        Utils.close(allocator);
        byte[] data = this.zkc.get().getData(allocationPath, false, null);
        Assert.assertEquals((Object)lh.getId(), (Object)Long.valueOf(new String(data, StandardCharsets.UTF_8)));
        this.bkc.get().openLedger(lh.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes(StandardCharsets.UTF_8));
    }

    @Test(timeout=60000L)
    public void testConcurrentAllocation() throws Exception {
        String allcationPath = "/" + this.runtime.getMethodName();
        SimpleLedgerAllocator allocator = this.createAllocator(allcationPath);
        allocator.allocate();
        ZKTransaction txn1 = this.newTxn();
        CompletableFuture<LedgerHandle> obtainFuture1 = allocator.tryObtain(txn1, NULL_LISTENER);
        ZKTransaction txn2 = this.newTxn();
        CompletableFuture<LedgerHandle> obtainFuture2 = allocator.tryObtain(txn2, NULL_LISTENER);
        Assert.assertTrue((boolean)obtainFuture2.isDone());
        Assert.assertTrue((boolean)obtainFuture2.isCompletedExceptionally());
        try {
            Utils.ioResult(obtainFuture2);
            Assert.fail((String)"Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle");
        }
        catch (SimpleLedgerAllocator.ConcurrentObtainException concurrentObtainException) {
            // empty catch block
        }
    }

    @Test(timeout=60000L)
    public void testObtainMultipleLedgers() throws Exception {
        String allocationPath = "/" + this.runtime.getMethodName();
        SimpleLedgerAllocator allocator = this.createAllocator(allocationPath);
        int numLedgers = 10;
        HashSet<LedgerHandle> allocatedLedgers = new HashSet<LedgerHandle>();
        for (int i = 0; i < numLedgers; ++i) {
            allocator.allocate();
            ZKTransaction txn = this.newTxn();
            LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
            Utils.ioResult(txn.execute());
            allocatedLedgers.add(lh);
        }
        Assert.assertEquals((long)numLedgers, (long)allocatedLedgers.size());
    }

    @Test(timeout=60000L)
    public void testAllocationWithMetadata() throws Exception {
        String allocationPath = "/" + this.runtime.getMethodName();
        String application = "testApplicationMetadata";
        String component = "testComponentMetadata";
        String custom = "customMetadata";
        LedgerMetadata ledgerMetadata = new LedgerMetadata();
        ledgerMetadata.setApplication(application);
        ledgerMetadata.setComponent(component);
        ledgerMetadata.addCustomMetadata("custom", custom);
        SimpleLedgerAllocator allocator = this.createAllocator(allocationPath, this.dlConf, ledgerMetadata);
        allocator.allocate();
        ZKTransaction txn = this.newTxn();
        LedgerHandle lh = Utils.ioResult(allocator.tryObtain(txn, NULL_LISTENER));
        Map<String, byte[]> customMeta = lh.getCustomMetadata();
        Assert.assertEquals((Object)application, (Object)new String(customMeta.get("application"), StandardCharsets.UTF_8));
        Assert.assertEquals((Object)component, (Object)new String(customMeta.get("component"), StandardCharsets.UTF_8));
        Assert.assertEquals((Object)custom, (Object)new String(customMeta.get("custom"), StandardCharsets.UTF_8));
    }
}

