/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeperTestClient;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerOfflineBacklog;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ManagedLedgerBkTest
extends BookKeeperClusterTestCase {
    public ManagedLedgerBkTest() {
        super(2);
    }

    @Test
    public void testSimpleRead() throws Exception {
        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
        factoryConf.setMaxCacheSize(0L);
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc, factoryConf);
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataAckQuorumSize(1);
        ManagedLedger ledger = factory.open("my-ledger", config);
        ManagedCursor cursor = ledger.openCursor("c1");
        int N = 1;
        for (int i = 0; i < N; ++i) {
            String entry = "entry-" + i;
            ledger.addEntry(entry.getBytes());
        }
        List entries = cursor.readEntries(N);
        Assert.assertEquals((int)N, (int)entries.size());
        entries.forEach(e -> e.release());
        factory.shutdown();
    }

    @Test
    public void testBookieFailure() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc);
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        ManagedLedger ledger = factory.open("my-ledger", config);
        ManagedCursor cursor = ledger.openCursor("my-cursor");
        ledger.addEntry("entry-0".getBytes());
        this.killBookie(1);
        this.bkc.getZkHandle().close();
        try {
            ledger.addEntry("entry-1".getBytes());
            Assert.fail((String)"should fail");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        this.bkc.close();
        this.bkc = new BookKeeperTestClient(this.baseClientConf);
        this.startNewBookie();
        factory.shutdown();
        factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc);
        ledger = factory.open("my-ledger", config);
        cursor = ledger.openCursor("my-cursor");
        ledger.addEntry("entry-2".getBytes());
        Assert.assertEquals((long)3L, (long)cursor.getNumberOfEntriesInBacklog(false));
        List entries = cursor.readEntries(1);
        Assert.assertEquals((int)1, (int)entries.size());
        Assert.assertEquals((String)"entry-0", (String)new String(((Entry)entries.get(0)).getData()));
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(1);
        Assert.assertEquals((int)1, (int)entries.size());
        Assert.assertEquals((String)"entry-1", (String)new String(((Entry)entries.get(0)).getData()));
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(1);
        Assert.assertEquals((int)1, (int)entries.size());
        Assert.assertEquals((String)"entry-2", (String)new String(((Entry)entries.get(0)).getData()));
        entries.forEach(e -> e.release());
        factory.shutdown();
    }

    @Test
    public void verifyConcurrentUsage() throws Exception {
        int i;
        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
        config.setMaxCacheSize(0x6400000L);
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle(), config);
        EntryCacheManager cacheManager = factory.getEntryCacheManager();
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my-ledger", conf);
        int NumProducers = 1;
        int NumConsumers = 1;
        AtomicBoolean done = new AtomicBoolean();
        CyclicBarrier barrier = new CyclicBarrier(NumProducers + NumConsumers + 1);
        ArrayList futures = Lists.newArrayList();
        for (i = 0; i < NumProducers; ++i) {
            futures.add(this.executor.submit(() -> {
                try {
                    barrier.await();
                    while (!done.get()) {
                        ledger.addEntry("entry".getBytes());
                        Thread.sleep(1L);
                    }
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }));
        }
        i = 0;
        while (i < NumConsumers) {
            int idx = i++;
            futures.add(this.executor.submit(() -> {
                try {
                    barrier.await();
                    ManagedCursor cursor = ledger.openCursor("my-cursor-" + idx);
                    while (!done.get()) {
                        List entries = cursor.readEntries(1);
                        if (!entries.isEmpty()) {
                            cursor.markDelete(((Entry)entries.get(0)).getPosition());
                        }
                        entries.forEach(e -> e.release());
                        Thread.sleep(2L);
                    }
                }
                catch (Exception e2) {
                    e2.printStackTrace();
                }
            }));
        }
        barrier.await();
        Thread.sleep(1000L);
        done.set(true);
        for (Future future : futures) {
            future.get();
        }
        cacheManager.mlFactoryMBean.refreshStats(1L, TimeUnit.SECONDS);
        Assert.assertTrue((cacheManager.mlFactoryMBean.getCacheHitsRate() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((double)cacheManager.mlFactoryMBean.getCacheMissesRate(), (double)0.0);
        Assert.assertTrue((cacheManager.mlFactoryMBean.getCacheHitsThroughput() > 0.0 ? 1 : 0) != 0);
        Assert.assertEquals((long)cacheManager.mlFactoryMBean.getNumberOfCacheEvictions(), (long)0L);
        factory.shutdown();
    }

    @Test
    public void testSimple() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc);
        ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
        mlConfig.setEnsembleSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setWriteQuorumSize(1);
        mlConfig.setMaxEntriesPerLedger(100);
        mlConfig.setMetadataMaxEntriesPerLedger(2);
        ManagedLedger ledger = factory.open("ml-simple-ledger", mlConfig);
        ledger.addEntry("test".getBytes());
        factory.shutdown();
    }

    @Test
    public void testConcurrentMarkDelete() throws Exception {
        int i;
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc);
        ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
        mlConfig.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1);
        mlConfig.setMaxEntriesPerLedger(100);
        mlConfig.setMetadataMaxEntriesPerLedger(10);
        ManagedLedger ledger = factory.open("ml-markdelete-ledger", mlConfig);
        ArrayList addedEntries = Lists.newArrayList();
        int numCursors = 10;
        CyclicBarrier barrier = new CyclicBarrier(numCursors);
        ArrayList cursors = Lists.newArrayList();
        for (i = 0; i < numCursors; ++i) {
            cursors.add(ledger.openCursor(String.format("c%d", i)));
        }
        for (i = 0; i < 50; ++i) {
            Position pos = ledger.addEntry("entry".getBytes());
            addedEntries.add(pos);
        }
        ArrayList futures = Lists.newArrayList();
        for (ManagedCursor cursor : cursors) {
            futures.add(this.executor.submit(() -> {
                barrier.await();
                for (Position position : addedEntries) {
                    cursor.markDelete(position);
                }
                return null;
            }));
        }
        for (Future future : futures) {
            future.get();
        }
        Thread.sleep(1000L);
        factory.shutdown();
    }

    @Test
    public void asyncMarkDeleteAndClose() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc);
        ManagedLedgerConfig config = new ManagedLedgerConfig().setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1);
        ManagedLedger ledger = factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("c1");
        ArrayList positions = Lists.newArrayList();
        for (int i = 0; i < 10; ++i) {
            Position p = ledger.addEntry("entry".getBytes());
            positions.add(p);
        }
        final CountDownLatch counter = new CountDownLatch(positions.size());
        final AtomicBoolean gotException = new AtomicBoolean(false);
        for (Position p : positions) {
            cursor.asyncDelete(p, new AsyncCallbacks.DeleteCallback(){

                public void deleteComplete(Object ctx) {
                    counter.countDown();
                }

                public void deleteFailed(ManagedLedgerException exception, Object ctx) {
                    exception.printStackTrace();
                    gotException.set(true);
                    counter.countDown();
                }
            }, null);
        }
        cursor.close();
        ledger.close();
        counter.await();
        Assert.assertFalse((boolean)gotException.get());
        factory.shutdown();
    }

    @Test
    public void ledgerFencedByAutoReplication() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
        ManagedCursor c1 = ledger.openCursor("c1");
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry-1".getBytes());
        this.bkc.openLedger(p1.getLedgerId(), BookKeeper.DigestType.CRC32C, new byte[0]);
        ledger.addEntry("entry-2".getBytes());
        Assert.assertEquals((long)2L, (long)c1.getNumberOfEntries());
        Assert.assertEquals((long)2L, (long)c1.getNumberOfEntriesInBacklog(false));
        PositionImpl p3 = (PositionImpl)ledger.addEntry("entry-3".getBytes());
        Assert.assertEquals((long)3L, (long)c1.getNumberOfEntries());
        Assert.assertEquals((long)3L, (long)c1.getNumberOfEntriesInBacklog(false));
        Assert.assertTrue((p1.getLedgerId() != p3.getLedgerId() ? 1 : 0) != 0);
        factory.shutdown();
    }

    @Test
    public void ledgerFencedByFailover() throws Exception {
        ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        ManagedLedgerImpl ledger1 = (ManagedLedgerImpl)factory1.open("my_test_ledger", config);
        ledger1.openCursor("c");
        ledger1.addEntry("entry-1".getBytes());
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)factory2.open("my_test_ledger", config);
        ManagedCursor c2 = ledger2.openCursor("c");
        try {
            ledger1.addEntry("entry-2".getBytes());
            Assert.fail((String)"Should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        ledger2.addEntry("entry-2".getBytes());
        try {
            ledger1.addEntry("entry-2".getBytes());
            Assert.fail((String)"Should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        Assert.assertEquals((long)2L, (long)c2.getNumberOfEntriesInBacklog(false));
        factory1.shutdown();
        factory2.shutdown();
    }

    @Test
    public void testOfflineTopicBacklog() throws Exception {
        ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
        factoryConf.setMaxCacheSize(0L);
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc, factoryConf);
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataAckQuorumSize(1);
        ManagedLedger ledger = factory.open("property/cluster/namespace/my-ledger", config);
        ManagedCursor cursor = ledger.openCursor("c1");
        int N = 1;
        for (int i = 0; i < N; ++i) {
            String entry = "entry-" + i;
            ledger.addEntry(entry.getBytes());
        }
        List entries = cursor.readEntries(N);
        Assert.assertEquals((int)N, (int)entries.size());
        entries.forEach(e -> e.release());
        ledger.close();
        ManagedLedgerOfflineBacklog offlineTopicBacklog = new ManagedLedgerOfflineBacklog(DigestType.CRC32, "".getBytes(Charsets.UTF_8), "", false);
        PersistentOfflineTopicStats offlineTopicStats = offlineTopicBacklog.getEstimatedUnloadedTopicBacklog(factory, "property/cluster/namespace/my-ledger");
        factory.shutdown();
        Assert.assertNotNull((Object)offlineTopicStats);
    }

    @Test(timeOut=20000L)
    void testResetCursorAfterRecovery() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc);
        ManagedLedgerConfig conf = new ManagedLedgerConfig().setMaxEntriesPerLedger(10).setEnsembleSize(1).setWriteQuorumSize(1).setAckQuorumSize(1).setMetadataEnsembleSize(1).setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1);
        ManagedLedger ledger = factory.open("my_test_move_cursor_ledger", conf);
        ManagedCursor cursor = ledger.openCursor("trc1");
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes());
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes());
        Position p3 = ledger.addEntry("dummy-entry-3".getBytes());
        Position p4 = ledger.addEntry("dummy-entry-4".getBytes());
        cursor.markDelete(p3);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.zkc);
        ledger = factory2.open("my_test_move_cursor_ledger", conf);
        cursor = ledger.openCursor("trc1");
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p3);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)p4);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(false), (long)1L);
        cursor.resetCursor(p2);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)p2);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(false), (long)3L);
        factory2.shutdown();
        factory.shutdown();
    }

    @Test(timeOut=30000L)
    public void managedLedgerClosed() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        ManagedLedgerImpl ledger1 = (ManagedLedgerImpl)factory.open("my_test_ledger", config);
        int N = 100;
        final AtomicReference res = new AtomicReference();
        final CountDownLatch latch = new CountDownLatch(N);
        for (int i = 0; i < N; ++i) {
            ledger1.asyncAddEntry(("entry-" + i).getBytes(), new AsyncCallbacks.AddEntryCallback(){

                public void addComplete(Position position, Object ctx) {
                    latch.countDown();
                }

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    res.compareAndSet(null, exception);
                    latch.countDown();
                }
            }, null);
            if (i != 1) continue;
            ledger1.close();
        }
        latch.await();
        Assert.assertNotNull(res.get());
        Assert.assertEquals(((Object)((Object)((ManagedLedgerException)((Object)res.get())))).getClass(), ManagedLedgerException.ManagedLedgerAlreadyClosedException.class);
        factory.shutdown();
    }

    @Test
    public void testChangeCrcType() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        config.setDigestType(DigestType.CRC32);
        ManagedLedger ledger = factory.open("my_test_ledger", config);
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-0".getBytes());
        ledger.addEntry("entry-1".getBytes());
        ledger.addEntry("entry-2".getBytes());
        ledger.close();
        config.setDigestType(DigestType.CRC32C);
        ledger = factory.open("my_test_ledger", config);
        c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-3".getBytes());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)4L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(false), (long)4L);
        List entries = c1.readEntries(4);
        Assert.assertEquals((int)entries.size(), (int)4);
        for (int i = 0; i < 4; ++i) {
            Assert.assertEquals((String)new String(((Entry)entries.get(i)).getData()), (String)("entry-" + i));
        }
        factory.shutdown();
    }
}

