package org.apache.distributedlog.bk;

import com.google.common.base.Charsets;
import java.net.URI;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.versioning.LongVersion;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.distributedlog.BookKeeperClient;
import org.apache.distributedlog.BookKeeperClientBuilder;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.TestDistributedLogBase;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.bk.SimpleLedgerAllocator;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.util.Transaction;
import org.apache.distributedlog.util.Utils;
import org.apache.distributedlog.zk.DefaultZKOp;
import org.apache.distributedlog.zk.ZKTransaction;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/bk/TestLedgerAllocator.class */
public class TestLedgerAllocator extends TestDistributedLogBase {
    private static final String ledgersPath = "/ledgers";
    private ZooKeeperClient zkc;
    private BookKeeperClient bkc;
    private static final Logger logger = LoggerFactory.getLogger(TestLedgerAllocator.class);
    private static final Transaction.OpListener<LedgerHandle> NULL_LISTENER = new Transaction.OpListener<LedgerHandle>() { // from class: org.apache.distributedlog.bk.TestLedgerAllocator.1
        public void onCommit(LedgerHandle ledgerHandle) {
        }

        public void onAbort(Throwable th) {
        }
    };

    @Rule
    public TestName runtime = new TestName();
    private DistributedLogConfiguration dlConf = new DistributedLogConfiguration();

    private URI createURI(String str) {
        return URI.create("distributedlog://" + zkServers + str);
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @Before
    public void setup() throws Exception {
        this.zkc = TestZooKeeperClientBuilder.newBuilder().uri(createURI("/")).zkServers(zkServers).build();
        this.bkc = BookKeeperClientBuilder.newBuilder().name("bkc").dlConfig(this.dlConf).ledgersPath(ledgersPath).zkc(this.zkc).build();
    }

    @Override // org.apache.distributedlog.TestDistributedLogBase
    @After
    public void teardown() throws Exception {
        this.bkc.close();
        this.zkc.close();
    }

    private QuorumConfigProvider newQuorumConfigProvider(DistributedLogConfiguration distributedLogConfiguration) {
        return new ImmutableQuorumConfigProvider(distributedLogConfiguration.getQuorumConfig());
    }

    private ZKTransaction newTxn() {
        return new ZKTransaction(this.zkc);
    }

    private SimpleLedgerAllocator createAllocator(String str) throws Exception {
        return createAllocator(str, this.dlConf);
    }

    private SimpleLedgerAllocator createAllocator(String str, DistributedLogConfiguration distributedLogConfiguration) throws Exception {
        return (SimpleLedgerAllocator) Utils.ioResult(SimpleLedgerAllocator.of(str, (Versioned) null, newQuorumConfigProvider(distributedLogConfiguration), this.zkc, this.bkc));
    }

    public void testAllocation() throws Exception {
        SimpleLedgerAllocator createAllocator = createAllocator("/allocation1");
        createAllocator.allocate();
        ZKTransaction newTxn = newTxn();
        LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(createAllocator.tryObtain(newTxn, NULL_LISTENER));
        logger.info("Try obtaining ledger handle {}", Long.valueOf(ledgerHandle.getId()));
        Assert.assertEquals(Long.valueOf(ledgerHandle.getId()), Long.valueOf(new String(this.zkc.get().getData("/allocation1", false, (Stat) null), Charsets.UTF_8)));
        newTxn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(Charsets.UTF_8), -1), (Transaction.OpListener) null));
        try {
            Utils.ioResult(newTxn.execute());
            Assert.fail("Should fail the transaction when setting unexisted path");
        } catch (ZKException e) {
            logger.info("Should fail on executing transaction when setting unexisted path", e);
        }
        Assert.assertEquals(Long.valueOf(ledgerHandle.getId()), Long.valueOf(new String(this.zkc.get().getData("/allocation1", false, (Stat) null), Charsets.UTF_8)));
        ZKTransaction newTxn2 = newTxn();
        Assert.assertEquals(ledgerHandle.getId(), ((LedgerHandle) Utils.ioResult(createAllocator.tryObtain(newTxn2, NULL_LISTENER))).getId());
        Utils.ioResult(newTxn2.execute());
        Assert.assertEquals(0L, this.zkc.get().getData("/allocation1", false, (Stat) null).length);
        Utils.close(createAllocator);
    }

    @Test(timeout = 60000)
    public void testBadVersionOnTwoAllocators() throws Exception {
        this.zkc.get().create("/allocation-bad-version", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        Versioned versioned = new Versioned(this.zkc.get().getData("/allocation-bad-version", false, new Stat()), new LongVersion(r0.getVersion()));
        SimpleLedgerAllocator simpleLedgerAllocator = new SimpleLedgerAllocator("/allocation-bad-version", versioned, newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        SimpleLedgerAllocator simpleLedgerAllocator2 = new SimpleLedgerAllocator("/allocation-bad-version", versioned, newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        simpleLedgerAllocator.allocate();
        ZKTransaction newTxn = newTxn();
        LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(simpleLedgerAllocator.tryObtain(newTxn, NULL_LISTENER));
        simpleLedgerAllocator2.allocate();
        try {
            Utils.ioResult(simpleLedgerAllocator2.tryObtain(newTxn(), NULL_LISTENER));
            Assert.fail("Should fail allocating on second allocator as allocator1 is starting allocating something.");
        } catch (ZKException e) {
            Assert.assertEquals(KeeperException.Code.BADVERSION, e.getKeeperExceptionCode());
        }
        Utils.ioResult(newTxn.execute());
        Utils.close(simpleLedgerAllocator);
        Utils.close(simpleLedgerAllocator2);
        long addEntry = ledgerHandle.addEntry("hello world".getBytes());
        ledgerHandle.close();
        Enumeration readEntries = this.bkc.get().openLedger(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes()).readEntries(addEntry, addEntry);
        int i = 0;
        while (readEntries.hasMoreElements()) {
            Assert.assertEquals("hello world", new String(((LedgerEntry) readEntries.nextElement()).getEntry(), Charsets.UTF_8));
            i++;
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 60000)
    public void testAllocatorWithoutEnoughBookies() throws Exception {
        DistributedLogConfiguration distributedLogConfiguration = new DistributedLogConfiguration();
        distributedLogConfiguration.addConfiguration(conf);
        distributedLogConfiguration.setEnsembleSize(numBookies * 2);
        distributedLogConfiguration.setWriteQuorumSize(numBookies * 2);
        SimpleLedgerAllocator createAllocator = createAllocator("/allocator-without-enough-bookies", distributedLogConfiguration);
        createAllocator.allocate();
        try {
            Utils.ioResult(createAllocator.tryObtain(newTxn(), NULL_LISTENER));
            Assert.fail("Should fail allocating ledger if there aren't enough bookies");
        } catch (SimpleLedgerAllocator.AllocationException e) {
            Assert.assertEquals(SimpleLedgerAllocator.Phase.ERROR, e.getPhase());
        }
        Assert.assertEquals(0L, this.zkc.get().getData("/allocator-without-enough-bookies", false, (Stat) null).length);
    }

    @Test(timeout = 60000)
    public void testSuccessAllocatorShouldDeleteUnusedledger() throws Exception {
        this.zkc.get().create("/allocation-delete-unused-ledger", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        SimpleLedgerAllocator simpleLedgerAllocator = new SimpleLedgerAllocator("/allocation-delete-unused-ledger", new Versioned(this.zkc.get().getData("/allocation-delete-unused-ledger", false, new Stat()), new LongVersion(r0.getVersion())), newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        simpleLedgerAllocator.allocate();
        ZKTransaction newTxn = newTxn();
        LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(simpleLedgerAllocator.tryObtain(newTxn, NULL_LISTENER));
        SimpleLedgerAllocator simpleLedgerAllocator2 = new SimpleLedgerAllocator("/allocation-delete-unused-ledger", new Versioned(this.zkc.get().getData("/allocation-delete-unused-ledger", false, new Stat()), new LongVersion(r0.getVersion())), newQuorumConfigProvider(this.dlConf), this.zkc, this.bkc);
        simpleLedgerAllocator2.allocate();
        ZKTransaction newTxn2 = newTxn();
        LedgerHandle ledgerHandle2 = (LedgerHandle) Utils.ioResult(simpleLedgerAllocator2.tryObtain(newTxn2, NULL_LISTENER));
        try {
            Utils.ioResult(newTxn.execute());
            Assert.fail("Should fail commit obtaining ledger handle from first allocator as allocator is modified by second allocator.");
        } catch (ZKException e) {
        }
        Utils.ioResult(newTxn2.execute());
        Utils.close(simpleLedgerAllocator);
        Utils.close(simpleLedgerAllocator2);
        try {
            ledgerHandle.close();
            Assert.fail("LedgerHandle allocated by allocator1 should be deleted.");
        } catch (BKException e2) {
        }
        try {
            this.bkc.get().openLedger(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes());
            Assert.fail("LedgerHandle allocated by allocator1 should be deleted.");
        } catch (BKException.BKNoSuchLedgerExistsOnMetadataServerException e3) {
        }
        long addEntry = ledgerHandle2.addEntry("hello world".getBytes());
        ledgerHandle2.close();
        Enumeration readEntries = this.bkc.get().openLedger(ledgerHandle2.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes()).readEntries(addEntry, addEntry);
        int i = 0;
        while (readEntries.hasMoreElements()) {
            Assert.assertEquals("hello world", new String(((LedgerEntry) readEntries.nextElement()).getEntry(), Charsets.UTF_8));
            i++;
        }
        Assert.assertEquals(1L, i);
    }

    @Test(timeout = 60000)
    public void testCloseAllocatorDuringObtaining() throws Exception {
        SimpleLedgerAllocator createAllocator = createAllocator("/allocation2");
        createAllocator.allocate();
        LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(createAllocator.tryObtain(newTxn(), NULL_LISTENER));
        Utils.close(createAllocator);
        Assert.assertEquals(Long.valueOf(ledgerHandle.getId()), Long.valueOf(new String(this.zkc.get().getData("/allocation2", false, (Stat) null), Charsets.UTF_8)));
        this.bkc.get().openLedger(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes(Charsets.UTF_8));
    }

    public void testCloseAllocatorAfterConfirm() throws Exception {
        SimpleLedgerAllocator createAllocator = createAllocator("/allocation2");
        createAllocator.allocate();
        ZKTransaction newTxn = newTxn();
        LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(createAllocator.tryObtain(newTxn, NULL_LISTENER));
        Utils.ioResult(newTxn.execute());
        Utils.close(createAllocator);
        Assert.assertEquals(0L, this.zkc.get().getData("/allocation2", false, (Stat) null).length);
        this.bkc.get().openLedger(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes(Charsets.UTF_8));
    }

    @Test(timeout = 60000)
    public void testCloseAllocatorAfterAbort() throws Exception {
        SimpleLedgerAllocator createAllocator = createAllocator("/allocation3");
        createAllocator.allocate();
        ZKTransaction newTxn = newTxn();
        LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(createAllocator.tryObtain(newTxn, NULL_LISTENER));
        newTxn.addOp(DefaultZKOp.of(Op.setData("/unexistedpath", "data".getBytes(Charsets.UTF_8), -1), (Transaction.OpListener) null));
        try {
            Utils.ioResult(newTxn.execute());
            Assert.fail("Should fail the transaction when setting unexisted path");
        } catch (ZKException e) {
        }
        Utils.close(createAllocator);
        Assert.assertEquals(Long.valueOf(ledgerHandle.getId()), Long.valueOf(new String(this.zkc.get().getData("/allocation3", false, (Stat) null), Charsets.UTF_8)));
        this.bkc.get().openLedger(ledgerHandle.getId(), BookKeeper.DigestType.CRC32, this.dlConf.getBKDigestPW().getBytes(Charsets.UTF_8));
    }

    @Test(timeout = 60000)
    public void testConcurrentAllocation() throws Exception {
        SimpleLedgerAllocator createAllocator = createAllocator("/" + this.runtime.getMethodName());
        createAllocator.allocate();
        createAllocator.tryObtain(newTxn(), NULL_LISTENER);
        CompletableFuture tryObtain = createAllocator.tryObtain(newTxn(), NULL_LISTENER);
        Assert.assertTrue(tryObtain.isDone());
        Assert.assertTrue(tryObtain.isCompletedExceptionally());
        try {
            Utils.ioResult(tryObtain);
            Assert.fail("Should fail the concurrent obtain since there is already a transaction obtaining the ledger handle");
        } catch (SimpleLedgerAllocator.ConcurrentObtainException e) {
        }
    }

    @Test(timeout = 60000)
    public void testObtainMultipleLedgers() throws Exception {
        SimpleLedgerAllocator createAllocator = createAllocator("/" + this.runtime.getMethodName());
        HashSet hashSet = new HashSet();
        for (int i = 0; i < 10; i++) {
            createAllocator.allocate();
            ZKTransaction newTxn = newTxn();
            LedgerHandle ledgerHandle = (LedgerHandle) Utils.ioResult(createAllocator.tryObtain(newTxn, NULL_LISTENER));
            Utils.ioResult(newTxn.execute());
            hashSet.add(ledgerHandle);
        }
        Assert.assertEquals(10, hashSet.size());
    }
}
