/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ManagedCursorTest
extends MockedBookKeeperTestCase {
    private static final Charset Encoding = Charsets.UTF_8;
    private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);

    @Test(timeOut=20000L)
    void readFromEmptyLedger() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        List entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        ledger.addEntry("test".getBytes(Encoding));
        entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        Assert.assertEquals((String)c1.toString(), (String)"ManagedCursorImpl{ledger=my_test_ledger, name=c1, ackPos=3:-1, readPos=3:1}");
    }

    @Test(timeOut=20000L)
    void readTwice() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor c1 = ledger.openCursor("c1");
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void readWithCacheDisabled() throws Exception {
        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
        config.setMaxCacheSize(0L);
        this.factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle(), config);
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor c1 = ledger.openCursor("c1");
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"entry-1");
        Assert.assertEquals((String)new String(((Entry)entries.get(1)).getData(), Encoding), (String)"entry-2");
        entries.forEach(e -> e.release());
        entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void getEntryDataTwice() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-1".getBytes(Encoding));
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)1);
        Entry entry = (Entry)entries.get(0);
        Assert.assertEquals((int)entry.getLength(), (int)"entry-1".length());
        byte[] data1 = entry.getData();
        byte[] data2 = entry.getData();
        Assert.assertEquals((byte[])data1, (byte[])data2);
        entry.release();
    }

    @Test(timeOut=20000L)
    void readFromClosedLedger() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.close();
        try {
            c1.readEntries(2);
            Assert.fail((String)"ledger is closed, should fail");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    void testNumberOfEntries() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ManagedCursor c3 = ledger.openCursor("c3");
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ManagedCursor c4 = ledger.openCursor("c4");
        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ManagedCursor c5 = ledger.openCursor("c5");
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)4L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c2.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((boolean)c2.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c3.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((boolean)c3.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c4.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((boolean)c4.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c5.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c5.hasMoreEntries(), (boolean)false);
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        c1.markDelete(((Entry)entries.get(1)).getPosition());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void testNumberOfEntriesInBacklog() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ManagedCursor c3 = ledger.openCursor("c3");
        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ManagedCursor c4 = ledger.openCursor("c4");
        Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ManagedCursor c5 = ledger.openCursor("c5");
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        Assert.assertEquals((long)c2.getNumberOfEntriesInBacklog(), (long)3L);
        Assert.assertEquals((long)c3.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((long)c4.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((long)c5.getNumberOfEntriesInBacklog(), (long)0L);
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        c1.markDelete(p1);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        c1.delete(p3);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        c1.markDelete(p4);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
    }

    @Test(timeOut=20000L)
    void testNumberOfEntriesInBacklogWithFallback() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ManagedCursor c3 = ledger.openCursor("c3");
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ManagedCursor c4 = ledger.openCursor("c4");
        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ManagedCursor c5 = ledger.openCursor("c5");
        Field field = ManagedCursorImpl.class.getDeclaredField("messagesConsumedCounter");
        field.setAccessible(true);
        long counter = ((ManagedLedgerImpl)ledger).getEntriesAddedCounter() + 1L;
        field.setLong(c1, counter);
        field.setLong(c2, counter);
        field.setLong(c3, counter);
        field.setLong(c4, counter);
        field.setLong(c5, counter);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        Assert.assertEquals((long)c2.getNumberOfEntriesInBacklog(), (long)3L);
        Assert.assertEquals((long)c3.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((long)c4.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((long)c5.getNumberOfEntriesInBacklog(), (long)0L);
    }

    @Test(timeOut=20000L)
    void testNumberOfEntriesWithReopen() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ManagedCursor c3 = ledger.openCursor("c3");
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        c1 = ledger.openCursor("c1");
        c2 = ledger.openCursor("c2");
        c3 = ledger.openCursor("c3");
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c2.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((boolean)c2.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c3.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c3.hasMoreEntries(), (boolean)false);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void asyncReadWithoutErrors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        final CountDownLatch counter = new CountDownLatch(1);
        cursor.asyncReadEntries(100, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                Assert.assertNull((Object)ctx);
                Assert.assertEquals((int)entries.size(), (int)1);
                entries.forEach(e -> e.release());
                counter.countDown();
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    void asyncReadWithErrors() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        cursor.asyncReadEntries(100, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                entries.forEach(e -> e.release());
                counter.countDown();
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)"async-call should not have failed");
            }
        }, null);
        counter.await();
        cursor.rewind();
        ledger.entryCache.clear();
        final CountDownLatch counter2 = new CountDownLatch(1);
        cursor.asyncReadEntries(100, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                Assert.fail((String)"async-call should have failed");
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                counter2.countDown();
            }
        }, null);
        counter2.await();
    }

    @Test(timeOut=20000L, expectedExceptions={IllegalArgumentException.class})
    void asyncReadWithInvalidParameter() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        cursor.asyncReadEntries(0, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                Assert.fail((String)"async-call should have failed");
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    void markDeleteWithErrors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        List entries = cursor.readEntries(100);
        this.stopBookKeeper();
        Assert.assertEquals((int)entries.size(), (int)1);
        cursor.markDelete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void markDeleteWithZKErrors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)1);
        this.stopBookKeeper();
        this.stopZooKeeper();
        try {
            cursor.markDelete(((Entry)entries.get(0)).getPosition());
            Assert.fail((String)"Should have failed");
        }
        catch (Exception exception) {
            // empty catch block
        }
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void markDeleteAcrossLedgers() throws Exception {
        ManagedLedger ml1 = this.factory.open("my_test_ledger");
        ManagedCursor mc1 = ml1.openCursor("c1");
        ml1.close();
        mc1.close();
        this.factory.close(ml1);
        ManagedLedger ml2 = this.factory.open("my_test_ledger");
        ManagedCursor mc2 = ml2.openCursor("c1");
        Position pos = ml2.addEntry("dummy-entry-1".getBytes(Encoding));
        List entries = mc2.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"dummy-entry-1");
        entries.forEach(e -> e.release());
        mc2.delete(pos);
        Assert.assertEquals((Object)mc2.getMarkDeletedPosition(), (Object)pos);
        Assert.assertEquals((Object)mc2.getMarkDeletedPosition().getNext(), (Object)mc2.getReadPosition());
    }

    @Test(timeOut=20000L)
    void testResetCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_move_cursor_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
        ManagedCursor cursor = ledger.openCursor("trc1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl lastPosition = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        AtomicBoolean moveStatus = new AtomicBoolean(false);
        PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2L);
        try {
            cursor.resetCursor((Position)resetPosition);
            moveStatus.set(true);
        }
        catch (Exception e) {
            log.warn("error in reset cursor", e.getCause());
        }
        Assert.assertTrue((boolean)moveStatus.get());
        Assert.assertTrue((boolean)cursor.getReadPosition().equals(resetPosition));
        cursor.close();
        ledger.close();
    }

    @Test(timeOut=20000L)
    void testasyncResetCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_move_cursor_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
        ManagedCursor cursor = ledger.openCursor("tarc1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl lastPosition = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        final AtomicBoolean moveStatus = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2L);
        cursor.asyncResetCursor((Position)resetPosition, new AsyncCallbacks.ResetCursorCallback(){

            public void resetComplete(Object ctx) {
                moveStatus.set(true);
                countDownLatch.countDown();
            }

            public void resetFailed(ManagedLedgerException exception, Object ctx) {
                moveStatus.set(false);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertTrue((boolean)moveStatus.get());
        Assert.assertTrue((boolean)cursor.getReadPosition().equals(resetPosition));
        cursor.close();
        ledger.close();
    }

    @Test(timeOut=20000L)
    void testConcurrentResetCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_concurrent_move_ledger");
        int Messages = 100;
        int Consumers = 5;
        ArrayList futures = Lists.newArrayList();
        ExecutorService executor = Executors.newCachedThreadPool();
        final CyclicBarrier barrier = new CyclicBarrier(6);
        for (int i = 0; i < 100; ++i) {
            ledger.addEntry("test".getBytes());
        }
        final PositionImpl lastPosition = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        int i = 0;
        while (i < 5) {
            final ManagedCursor cursor = ledger.openCursor("tcrc" + i);
            final int idx = i++;
            futures.add(executor.submit(new Callable<AtomicBoolean>(){

                @Override
                public AtomicBoolean call() throws Exception {
                    barrier.await();
                    final AtomicBoolean moveStatus = new AtomicBoolean(false);
                    final CountDownLatch countDownLatch = new CountDownLatch(1);
                    PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - (long)(5 * idx));
                    cursor.asyncResetCursor((Position)resetPosition, new AsyncCallbacks.ResetCursorCallback(){

                        public void resetComplete(Object ctx) {
                            moveStatus.set(true);
                            PositionImpl pos = (PositionImpl)ctx;
                            log.info("move to [{}] completed for consumer [{}]", (Object)pos.toString(), (Object)idx);
                            countDownLatch.countDown();
                        }

                        public void resetFailed(ManagedLedgerException exception, Object ctx) {
                            moveStatus.set(false);
                            PositionImpl pos = (PositionImpl)ctx;
                            log.warn("move to [{}] failed for consumer [{}]", (Object)pos.toString(), (Object)idx);
                            countDownLatch.countDown();
                        }
                    });
                    countDownLatch.await();
                    Assert.assertTrue((boolean)cursor.getReadPosition().equals(resetPosition));
                    cursor.close();
                    return moveStatus;
                }
            }));
        }
        barrier.await();
        for (Future f : futures) {
            Assert.assertTrue((boolean)((AtomicBoolean)f.get()).get());
        }
        ledger.close();
    }

    @Test(timeOut=20000L)
    void seekPosition() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl lastPosition = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        cursor.seek((Position)new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 1L));
    }

    @Test(timeOut=20000L)
    void seekPosition2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        PositionImpl seekPosition = (PositionImpl)ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        cursor.seek((Position)new PositionImpl(seekPosition.getLedgerId(), seekPosition.getEntryId()));
    }

    @Test(timeOut=20000L)
    void seekPosition3() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl seekPosition = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        Position entry5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        Position entry6 = ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        cursor.seek((Position)new PositionImpl(seekPosition.getLedgerId(), seekPosition.getEntryId()));
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)seekPosition);
        List entries = cursor.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"dummy-entry-4");
        entries.forEach(e -> e.release());
        cursor.seek(entry5.getNext());
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)entry6);
        entries = cursor.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"dummy-entry-6");
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void seekPosition4() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        cursor.markDelete(p1);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)p2);
        List entries = cursor.readEntries(2);
        entries.forEach(e -> e.release());
        cursor.seek(p2);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)p2);
    }

    @Test(timeOut=20000L)
    void rewind() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        log.debug("p1: {}", (Object)p1);
        log.debug("p2: {}", (Object)p2);
        log.debug("p3: {}", (Object)p3);
        log.debug("p4: {}", (Object)p4);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)4L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        c1.markDelete(p1);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        List entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)3);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        c1.rewind();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        c1.markDelete(p2);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        c1.rewind();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        c1.markDelete(p4);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
        c1.rewind();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
    }

    @Test(timeOut=20000L)
    void markDeleteSkippingMessage() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
        ManagedCursor cursor = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl p4 = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)4L);
        cursor.markDelete(p1);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)p2);
        List entries = cursor.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"dummy-entry-2");
        entries.forEach(e -> e.release());
        cursor.markDelete((Position)p4);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)new PositionImpl(p4.getLedgerId(), p4.getEntryId() + 1L));
    }

    @Test(timeOut=20000L)
    void removingCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)6L);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)6L);
        ledger.deleteCursor("c1");
        cursor = ledger.openCursor("c1");
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        ledger.addEntry("dummy-entry-7".getBytes(Encoding));
        while (ledger.getNumberOfEntries() > 2L) {
            Thread.sleep(10L);
        }
    }

    @Test(timeOut=20000L)
    void cursorPersistence() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        List entries = c1.readEntries(3);
        Position p1 = ((Entry)entries.get(2)).getPosition();
        c1.markDelete(p1);
        entries.forEach(e -> e.release());
        entries = c1.readEntries(4);
        Position p2 = ((Entry)entries.get(2)).getPosition();
        c2.markDelete(p2);
        entries.forEach(e -> e.release());
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        c2 = ledger.openCursor("c2");
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((Object)c2.getMarkDeletedPosition(), (Object)p2);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void cursorPersistence2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMetadataMaxEntriesPerLedger(1));
        ManagedCursor c1 = ledger.openCursor("c1");
        ManagedCursor c2 = ledger.openCursor("c2");
        ManagedCursor c3 = ledger.openCursor("c3");
        Position p0 = c3.getMarkDeletedPosition();
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c4 = ledger.openCursor("c4");
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        Position p5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        c1.markDelete(p1);
        c1.markDelete(p2);
        c1.markDelete(p3);
        c1.markDelete(p4);
        c1.markDelete(p5);
        c2.markDelete(p1);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = this.factory.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        c2 = ledger.openCursor("c2");
        c3 = ledger.openCursor("c3");
        c4 = ledger.openCursor("c4");
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p5);
        Assert.assertEquals((Object)c2.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((Object)c3.getMarkDeletedPosition(), (Object)p0);
        Assert.assertEquals((Object)c4.getMarkDeletedPosition(), (Object)p1);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    public void asyncMarkDeleteBlocking() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMetadataMaxEntriesPerLedger(5);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        final ManagedCursor c1 = ledger.openCursor("c1");
        final AtomicReference lastPosition = new AtomicReference();
        int N = 100;
        final CountDownLatch latch = new CountDownLatch(100);
        for (int i = 0; i < 100; ++i) {
            ledger.asyncAddEntry("entry".getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                }

                public void addComplete(Position position, Object ctx) {
                    lastPosition.set(position);
                    c1.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback(){

                        public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                        }

                        public void markDeleteComplete(Object ctx) {
                            latch.countDown();
                        }
                    }, null);
                }
            }, null);
        }
        latch.await();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger");
        ManagedCursor c2 = ledger.openCursor("c1");
        Assert.assertEquals((Object)c2.getMarkDeletedPosition(), lastPosition.get());
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void cursorPersistenceAsyncMarkDeleteSameThread() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMetadataMaxEntriesPerLedger(5));
        ManagedCursor c1 = ledger.openCursor("c1");
        int N = 100;
        ArrayList positions = Lists.newArrayList();
        for (int i = 0; i < 100; ++i) {
            Position p = ledger.addEntry("dummy-entry".getBytes(Encoding));
            positions.add(p);
        }
        Position lastPosition = (Position)positions.get(99);
        final CountDownLatch latch = new CountDownLatch(100);
        for (Position p : positions) {
            c1.asyncMarkDelete(p, new AsyncCallbacks.MarkDeleteCallback(){

                public void markDeleteComplete(Object ctx) {
                    latch.countDown();
                }

                public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                    log.error("Failed to markdelete", (Throwable)exception);
                    latch.countDown();
                }
            }, null);
        }
        latch.await();
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger");
        ManagedCursor c2 = ledger.openCursor("c1");
        Assert.assertEquals((Object)c2.getMarkDeletedPosition(), (Object)lastPosition);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void unorderedMarkDelete() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
        c1.markDelete(p2);
        try {
            c1.markDelete(p1);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p2);
    }

    @Test(timeOut=20000L)
    void unorderedAsyncMarkDelete() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
        final CountDownLatch latch = new CountDownLatch(2);
        c1.asyncMarkDelete(p2, new AsyncCallbacks.MarkDeleteCallback(){

            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail();
            }

            public void markDeleteComplete(Object ctx) {
                latch.countDown();
            }
        }, null);
        c1.asyncMarkDelete(p1, new AsyncCallbacks.MarkDeleteCallback(){

            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                latch.countDown();
            }

            public void markDeleteComplete(Object ctx) {
                Assert.fail();
            }
        }, null);
        latch.await();
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p2);
    }

    @Test(timeOut=20000L)
    void deleteCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        ledger.deleteCursor("c1");
        try {
            c1.readEntries(10);
            Assert.fail((String)"must fail, the cursor should be closed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        try {
            c1.markDelete(p2);
            Assert.fail((String)"must fail, the cursor should be closed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        c1.close();
        try {
            c1.readEntries(10);
            Assert.fail((String)"must fail, the cursor should be closed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        c1.close();
    }

    @Test(timeOut=20000L)
    void errorCreatingCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        this.bkc.failAfter(1, -6);
        this.zkc.failNow(KeeperException.Code.SESSIONEXPIRED);
        try {
            ledger.openCursor("c1");
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test
    void failDuringRecoveryWithEmptyLedger() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("cursor");
        ledger.addEntry("entry-1".getBytes());
        Position p2 = ledger.addEntry("entry-2".getBytes());
        Position p3 = ledger.addEntry("entry-3".getBytes());
        cursor.markDelete(p2);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        cursor = ledger.openCursor("cursor");
        cursor.markDelete(p3);
        this.bkc.returnEmptyLedgerAfter(1);
        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, (ZooKeeper)this.zkc, conf);
        ledger = factory2.open("my_test_ledger");
        cursor = ledger.openCursor("cursor");
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p2);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void errorRecoveringCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        Position p1 = ledger.addEntry("entry".getBytes());
        ledger.addEntry("entry".getBytes());
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p3 = ledger.addEntry("entry".getBytes());
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p3);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        this.bkc.failAfter(3, -10);
        ledger = factory2.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p3);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void errorRecoveringCursor2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        this.bkc.failAfter(4, -17);
        try {
            ledger = factory2.open("my_test_ledger");
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void errorRecoveringCursor3() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        Position p1 = ledger.addEntry("entry".getBytes());
        ledger.addEntry("entry".getBytes());
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p3 = ledger.addEntry("entry".getBytes());
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p3);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        this.bkc.failAfter(4, -1);
        ledger = factory2.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p3);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void testSingleDelete() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
        ManagedCursor cursor = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("entry1".getBytes());
        Position p2 = ledger.addEntry("entry2".getBytes());
        Position p3 = ledger.addEntry("entry3".getBytes());
        Position p4 = ledger.addEntry("entry4".getBytes());
        Position p5 = ledger.addEntry("entry5".getBytes());
        Position p6 = ledger.addEntry("entry6".getBytes());
        Position p0 = cursor.getMarkDeletedPosition();
        cursor.delete(p4);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p0);
        cursor.delete(p1);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
        cursor.delete(p3);
        cursor.delete(p3);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
        cursor.delete(p2);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p4);
        cursor.delete(p5);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p5);
        cursor.close();
        try {
            cursor.delete(p6);
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    void testFilteringReadEntries() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("entry1".getBytes());
        ledger.addEntry("entry2".getBytes());
        ledger.addEntry("entry3".getBytes());
        ledger.addEntry("entry4".getBytes());
        Position p5 = ledger.addEntry("entry5".getBytes());
        ledger.addEntry("entry6".getBytes());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)6L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)6L);
        List entries = cursor.readEntries(3);
        Assert.assertEquals((int)entries.size(), (int)3);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)6L);
        log.info("Deleting {}", (Object)p5);
        cursor.delete(p5);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)5L);
        entries = cursor.readEntries(3);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)5L);
    }

    @Test(timeOut=20000L)
    void testReadingAllFilteredEntries() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
        ledger.openCursor("c1");
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("entry1".getBytes());
        Position p2 = ledger.addEntry("entry2".getBytes());
        Position p3 = ledger.addEntry("entry3".getBytes());
        Position p4 = ledger.addEntry("entry4".getBytes());
        Position p5 = ledger.addEntry("entry5".getBytes());
        ((Entry)c2.readEntries(1).get(0)).release();
        c2.delete(p2);
        c2.delete(p3);
        List entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        Assert.assertEquals((Object)((Entry)entries.get(0)).getPosition(), (Object)p4);
        Assert.assertEquals((Object)((Entry)entries.get(1)).getPosition(), (Object)p5);
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void testCountingWithDeletedEntries() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor cursor = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("entry1".getBytes());
        ledger.addEntry("entry2".getBytes());
        ledger.addEntry("entry3".getBytes());
        ledger.addEntry("entry4".getBytes());
        Position p5 = ledger.addEntry("entry5".getBytes());
        Position p6 = ledger.addEntry("entry6".getBytes());
        Position p7 = ledger.addEntry("entry7".getBytes());
        Position p8 = ledger.addEntry("entry8".getBytes());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)8L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)8L);
        cursor.delete(p8);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)7L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)7L);
        cursor.delete(p1);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)6L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)6L);
        cursor.delete(p7);
        cursor.delete(p6);
        cursor.delete(p5);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)3L);
    }

    @Test(timeOut=20000L)
    void testMarkDeleteTwice() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor cursor = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("entry1".getBytes());
        cursor.markDelete(p1);
        cursor.markDelete(p1);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
    }

    @Test(timeOut=20000L)
    void testSkipEntries() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor c1 = ledger.openCursor("c1");
        Position pos = c1.getReadPosition();
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)pos);
        pos = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        pos = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)pos.getNext());
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)pos);
        for (int i = 0; i < 6; ++i) {
            pos = ledger.addEntry("dummy-entry".getBytes(Encoding));
        }
        c1.skipEntries(5, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        c1.skipEntries(10, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)pos.getNext());
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)pos);
    }

    @Test(timeOut=20000L)
    void testSkipEntriesWithIndividualDeletedMessages() throws Exception {
        ManagedLedger ledger = this.factory.open("testSkipEntriesWithIndividualDeletedMessages", new ManagedLedgerConfig().setMaxEntriesPerLedger(5));
        ManagedCursor c1 = ledger.openCursor("c1");
        Position pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Position pos2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Position pos3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        Position pos4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        Position pos5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        c1.delete(pos2);
        c1.delete(pos4);
        c1.skipEntries(3, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)pos5.getNext());
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)pos5);
        pos1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        pos2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        pos3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        pos4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        pos5 = ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        c1.delete(pos2);
        c1.delete(pos4);
        c1.skipEntries(4, ManagedCursor.IndividualDeletedEntries.Include);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)pos5);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)pos4);
    }

    @Test(timeOut=20000L)
    void testClearBacklog() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c2 = ledger.openCursor("c2");
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ManagedCursor c3 = ledger.openCursor("c3");
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)true);
        c1.clearBacklog();
        c3.clearBacklog();
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((long)c2.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((long)c2.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((boolean)c2.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c3.getNumberOfEntriesInBacklog(), (long)0L);
        Assert.assertEquals((long)c3.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c3.hasMoreEntries(), (boolean)false);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        c1 = ledger.openCursor("c1");
        c2 = ledger.openCursor("c2");
        c3 = ledger.openCursor("c3");
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((long)c2.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((long)c2.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((boolean)c2.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c3.getNumberOfEntriesInBacklog(), (long)0L);
        Assert.assertEquals((long)c3.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c3.hasMoreEntries(), (boolean)false);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void testRateLimitMarkDelete() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setThrottleMarkDelete(1.0);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        c1.markDelete(p1);
        c1.markDelete(p2);
        c1.markDelete(p3);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig());
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void deleteSingleMessageTwice() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
        Position p3 = ledger.addEntry("entry-3".getBytes(Encoding));
        Position p4 = ledger.addEntry("entry-4".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)4L);
        c1.delete(p1);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p2);
        c1.delete(p1);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p2);
        c1.delete(p2);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p2);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p3);
        c1.delete(p2);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p2);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p3);
        c1.delete(p3);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p3);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p4);
        c1.delete(p3);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p3);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p4);
        c1.delete(p4);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p4);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p4.getNext());
        c1.delete(p4);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p4);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p4.getNext());
    }

    @Test(timeOut=10000L)
    void testReadEntriesOrWait() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        int Consumers = 10;
        final CountDownLatch counter = new CountDownLatch(10);
        for (int i = 0; i < 10; ++i) {
            ManagedCursor c = ledger.openCursor("c" + i);
            c.asyncReadEntriesOrWait(1, new AsyncCallbacks.ReadEntriesCallback(){

                public void readEntriesComplete(List<Entry> entries, Object ctx) {
                    Assert.assertEquals((int)entries.size(), (int)1);
                    entries.forEach(e -> e.release());
                    counter.countDown();
                }

                public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                    log.error("Error reading", (Throwable)exception);
                }
            }, null);
        }
        ledger.addEntry("test".getBytes());
        counter.await();
    }

    @Test(timeOut=20000L)
    void testReadEntriesOrWaitBlocking() throws Exception {
        int i;
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        int Messages = 100;
        int Consumers = 10;
        ArrayList futures = Lists.newArrayList();
        ExecutorService executor = Executors.newCachedThreadPool();
        final CyclicBarrier barrier = new CyclicBarrier(11);
        for (i = 0; i < 10; ++i) {
            final ManagedCursor cursor = ledger.openCursor("c" + i);
            futures.add(executor.submit(new Callable<Void>(){

                @Override
                public Void call() throws Exception {
                    List entries;
                    barrier.await();
                    for (int toRead = 100; toRead > 0; toRead -= entries.size()) {
                        entries = cursor.readEntriesOrWait(10);
                        Assert.assertTrue((entries.size() <= 10 ? 1 : 0) != 0);
                        entries.forEach(e -> e.release());
                    }
                    return null;
                }
            }));
        }
        barrier.await();
        for (i = 0; i < 100; ++i) {
            ledger.addEntry("test".getBytes());
        }
        for (Future f : futures) {
            f.get();
        }
    }

    @Test(timeOut=20000L)
    void testFindNewestMatching() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertNull((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))));
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingOdd1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        Position p1 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p1);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingOdd2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        Position p2 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p2);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingOdd3() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position p3 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p3);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingOdd4() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position p4 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p4);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingOdd5() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position p5 = ledger.addEntry("expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p5);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEven1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        Position p1 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p1);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEven2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        Position p2 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p2);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEven3() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position p3 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p3);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEven4() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position p4 = ledger.addEntry("expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p4);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), null);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        Position p1 = ledger.addEntry("expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p1);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase3() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        Position p1 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p1);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase4() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        Position p1 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p1);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase5() throws Exception {
        ManagedLedger ledger = this.factory.open("testFindNewestMatchingEdgeCase5");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        Position p2 = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)p2);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase6() throws Exception {
        ManagedLedger ledger = this.factory.open("testFindNewestMatchingEdgeCase6", new ManagedLedgerConfig().setMaxEntriesPerLedger(3));
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position newPosition = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        List entries = c1.readEntries(3);
        c1.markDelete(((Entry)entries.get(2)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)newPosition);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase7() throws Exception {
        ManagedLedger ledger = this.factory.open("testFindNewestMatchingEdgeCase7");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position lastPosition = ledger.addEntry("expired".getBytes(Encoding));
        List entries = c1.readEntries(4);
        c1.markDelete(((Entry)entries.get(0)).getPosition());
        c1.delete(((Entry)entries.get(2)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)lastPosition);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase8() throws Exception {
        ManagedLedger ledger = this.factory.open("testFindNewestMatchingEdgeCase8");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position lastPosition = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        List entries = c1.readEntries(4);
        c1.delete(((Entry)entries.get(1)).getPosition());
        c1.delete(((Entry)entries.get(2)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)lastPosition);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase9() throws Exception {
        ManagedLedger ledger = this.factory.open("testFindNewestMatchingEdgeCase9");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position lastPosition = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        List entries = c1.readEntries(5);
        c1.delete(((Entry)entries.get(1)).getPosition());
        c1.delete(((Entry)entries.get(3)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)lastPosition);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingEdgeCase10() throws Exception {
        ManagedLedger ledger = this.factory.open("testFindNewestMatchingEdgeCase10");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("expired".getBytes(Encoding));
        Position lastPosition = ledger.addEntry("expired".getBytes(Encoding));
        ledger.addEntry("not-expired".getBytes(Encoding));
        List entries = c1.readEntries(7);
        c1.delete(((Entry)entries.get(1)).getPosition());
        c1.delete(((Entry)entries.get(3)).getPosition());
        c1.delete(((Entry)entries.get(6)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertEquals((Object)c1.findNewestMatching(entry -> Arrays.equals(entry.getDataAndRelease(), "expired".getBytes(Encoding))), (Object)lastPosition);
    }

    @Test(timeOut=20000L)
    void testIndividuallyDeletedMessages() throws Exception {
        ManagedLedger ledger = this.factory.open("testIndividuallyDeletedMessages");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("entry-0".getBytes(Encoding));
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        ledger.addEntry("entry-4".getBytes(Encoding));
        List entries = c1.readEntries(4);
        c1.delete(((Entry)entries.get(1)).getPosition());
        c1.delete(((Entry)entries.get(2)).getPosition());
        c1.markDelete(((Entry)entries.get(3)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertTrue((boolean)c1.isIndividuallyDeletedEntriesEmpty());
    }

    @Test(timeOut=20000L)
    void testIndividuallyDeletedMessages1() throws Exception {
        ManagedLedger ledger = this.factory.open("testIndividuallyDeletedMessages1");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("entry-0".getBytes(Encoding));
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        ledger.addEntry("entry-4".getBytes(Encoding));
        List entries = c1.readEntries(4);
        c1.delete(((Entry)entries.get(1)).getPosition());
        c1.markDelete(((Entry)entries.get(3)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertTrue((boolean)c1.isIndividuallyDeletedEntriesEmpty());
    }

    @Test(timeOut=20000L)
    void testIndividuallyDeletedMessages2() throws Exception {
        ManagedLedger ledger = this.factory.open("testIndividuallyDeletedMessages2");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("entry-0".getBytes(Encoding));
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        ledger.addEntry("entry-4".getBytes(Encoding));
        List entries = c1.readEntries(4);
        c1.delete(((Entry)entries.get(1)).getPosition());
        c1.delete(((Entry)entries.get(2)).getPosition());
        c1.delete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertTrue((boolean)c1.isIndividuallyDeletedEntriesEmpty());
    }

    @Test(timeOut=20000L)
    void testIndividuallyDeletedMessages3() throws Exception {
        ManagedLedger ledger = this.factory.open("testIndividuallyDeletedMessages3");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ledger.addEntry("entry-0".getBytes(Encoding));
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        ledger.addEntry("entry-4".getBytes(Encoding));
        List entries = c1.readEntries(4);
        c1.delete(((Entry)entries.get(1)).getPosition());
        c1.delete(((Entry)entries.get(2)).getPosition());
        c1.markDelete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertTrue((boolean)c1.isIndividuallyDeletedEntriesEmpty());
    }

    public static byte[] getEntryPublishTime(String msg) throws Exception {
        return Long.toString(System.currentTimeMillis()).getBytes();
    }

    public Position findPositionFromAllEntries(ManagedCursor c1, long timestamp) throws Exception {
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedgerException exception = null;
            Position position = null;

            Result() {
            }
        }
        final Result result = new Result();
        AsyncCallbacks.FindEntryCallback findEntryCallback = new AsyncCallbacks.FindEntryCallback(){
            {
            }

            public void findEntryComplete(Position position, Object ctx) {
                result.position = position;
                counter.countDown();
            }

            public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        };
        c1.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries, entry -> {
            try {
                long publishTime = Long.valueOf(new String(entry.getData()));
                boolean bl = publishTime <= timestamp;
                return bl;
            }
            catch (Exception e) {
                log.error("Error de-serializing message for message position find", (Throwable)e);
            }
            finally {
                entry.release();
            }
            return false;
        }, findEntryCallback, (Object)ManagedCursor.FindPositionConstraint.SearchAllAvailableEntries);
        counter.await();
        if (result.exception != null) {
            throw result.exception;
        }
        return result.position;
    }

    void internalTestFindNewestMatchingAllEntries(String name, int entriesPerLedger, int expectedEntryId) throws Exception {
        String ledgerAndCursorName = name;
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(10L);
        config.setMaxEntriesPerLedger(entriesPerLedger);
        config.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger ledger = this.factory.open(ledgerAndCursorName, config);
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor(ledgerAndCursorName);
        ledger.addEntry(ManagedCursorTest.getEntryPublishTime("retained1"));
        Thread.sleep(100L);
        ledger.addEntry(ManagedCursorTest.getEntryPublishTime("retained2"));
        Thread.sleep(100L);
        ledger.addEntry(ManagedCursorTest.getEntryPublishTime("retained3"));
        Thread.sleep(100L);
        Position newPosition = ledger.addEntry(ManagedCursorTest.getEntryPublishTime("expectedresetposition"));
        long timestamp = System.currentTimeMillis();
        long ledgerId = ((PositionImpl)newPosition).getLedgerId();
        Thread.sleep(2L);
        ledger.addEntry(ManagedCursorTest.getEntryPublishTime("not-read"));
        List entries = c1.readEntries(3);
        c1.markDelete(((Entry)entries.get(2)).getPosition());
        c1.close();
        ledger.close();
        entries.forEach(e -> e.release());
        Thread.sleep(100L);
        ledger = this.factory.open(ledgerAndCursorName, config);
        c1 = (ManagedCursorImpl)ledger.openCursor(ledgerAndCursorName);
        PositionImpl found = (PositionImpl)this.findPositionFromAllEntries((ManagedCursor)c1, timestamp);
        Assert.assertEquals((long)found.getLedgerId(), (long)ledgerId);
        Assert.assertEquals((long)found.getEntryId(), (long)expectedEntryId);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingAllEntries() throws Exception {
        String ledgerAndCursorName = "testFindNewestMatchingAllEntries";
        int expectedEntryId = 1;
        int entriesPerLedger = 2;
        this.internalTestFindNewestMatchingAllEntries("testFindNewestMatchingAllEntries", entriesPerLedger, expectedEntryId);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingAllEntries2() throws Exception {
        String ledgerAndCursorName = "testFindNewestMatchingAllEntries2";
        int expectedEntryId = 0;
        int entriesPerLedger = 1;
        this.internalTestFindNewestMatchingAllEntries("testFindNewestMatchingAllEntries2", entriesPerLedger, expectedEntryId);
    }

    @Test(timeOut=20000L)
    void testFindNewestMatchingAllEntriesSingleLedger() throws Exception {
        String ledgerAndCursorName = "testFindNewestMatchingAllEntriesSingleLedger";
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        int expectedEntryId = 3;
        int entriesPerLedger = config.getMaxEntriesPerLedger();
        this.internalTestFindNewestMatchingAllEntries("testFindNewestMatchingAllEntriesSingleLedger", entriesPerLedger, expectedEntryId);
    }

    @Test(timeOut=20000L)
    void testReplayEntries() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry1".getBytes(Encoding));
        PositionImpl p2 = (PositionImpl)ledger.addEntry("entry2".getBytes(Encoding));
        PositionImpl p3 = (PositionImpl)ledger.addEntry("entry3".getBytes(Encoding));
        ledger.addEntry("entry4".getBytes(Encoding));
        HashSet positions = Sets.newHashSet();
        Assert.assertTrue((boolean)c1.replayEntries((Set)positions).isEmpty());
        positions.add(p1);
        positions.add(p3);
        List entries = c1.replayEntries((Set)positions);
        Assert.assertEquals((int)entries.size(), (int)2);
        Assert.assertTrue((Arrays.equals(((Entry)entries.get(0)).getData(), "entry1".getBytes(Encoding)) && Arrays.equals(((Entry)entries.get(1)).getData(), "entry3".getBytes(Encoding)) || Arrays.equals(((Entry)entries.get(0)).getData(), "entry3".getBytes(Encoding)) && Arrays.equals(((Entry)entries.get(1)).getData(), "entry1".getBytes(Encoding)) ? 1 : 0) != 0);
        entries.forEach(Entry::release);
        PositionImpl invalidPosition = new PositionImpl(100L, 100L);
        positions.add(invalidPosition);
        try {
            c1.replayEntries((Set)positions);
            Assert.fail((String)"Should fail");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        positions.remove(invalidPosition);
        c1.markDelete((Position)p2);
        try {
            Assert.assertEquals((int)1, (int)c1.replayEntries((Set)positions).size());
        }
        catch (ManagedLedgerException e) {
            Assert.fail((String)"Should have not failed");
        }
    }

    @Test(timeOut=20000L)
    void outOfOrderAcks() throws Exception {
        ManagedLedger ledger = this.factory.open("outOfOrderAcks");
        ManagedCursor c1 = ledger.openCursor("c1");
        int N = 10;
        ArrayList<Position> positions = new ArrayList<Position>();
        for (int i = 0; i < N; ++i) {
            positions.add(ledger.addEntry("entry".getBytes()));
        }
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)N);
        c1.delete((Position)positions.get(3));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)(N - 1));
        c1.delete((Position)positions.get(2));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)(N - 2));
        c1.delete((Position)positions.get(1));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)(N - 3));
        c1.delete((Position)positions.get(0));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)(N - 4));
    }

    @Test(timeOut=20000L)
    void randomOrderAcks() throws Exception {
        ManagedLedger ledger = this.factory.open("outOfOrderAcks");
        ManagedCursor c1 = ledger.openCursor("c1");
        int N = 10;
        ArrayList<Position> positions = new ArrayList<Position>();
        for (int i = 0; i < N; ++i) {
            positions.add(ledger.addEntry("entry".getBytes()));
        }
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)N);
        Collections.shuffle(positions);
        int toDelete = N;
        for (Position p : positions) {
            Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)toDelete);
            c1.delete(p);
            Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)(--toDelete));
        }
    }

    @Test(timeOut=20000L)
    void testGetEntryAfterN() throws Exception {
        ManagedLedger ledger = this.factory.open("testGetEntryAfterN");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position pos1 = ledger.addEntry("msg1".getBytes());
        Position pos2 = ledger.addEntry("msg2".getBytes());
        Position pos3 = ledger.addEntry("msg3".getBytes());
        Position pos4 = ledger.addEntry("msg4".getBytes());
        Position pos5 = ledger.addEntry("msg5".getBytes());
        List entries = c1.readEntries(4);
        entries.forEach(e -> e.release());
        long currentLedger = ((PositionImpl)c1.getMarkDeletedPosition()).getLedgerId();
        Entry e2 = c1.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((byte[])e2.getDataAndRelease(), (byte[])"msg1".getBytes());
        e2 = c1.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((byte[])e2.getDataAndRelease(), (byte[])"msg1".getBytes());
        e2 = c1.getNthEntry(3, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((byte[])e2.getDataAndRelease(), (byte[])"msg3".getBytes());
        e2 = c1.getNthEntry(5, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((byte[])e2.getDataAndRelease(), (byte[])"msg5".getBytes());
        e2 = c1.getNthEntry(10, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertNull((Object)e2);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)new PositionImpl(currentLedger, -1L));
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)new PositionImpl(currentLedger, 4L));
        c1.markDelete(pos4);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)pos4);
        e2 = c1.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((byte[])e2.getDataAndRelease(), (byte[])"msg5".getBytes());
        c1.readEntries(1);
        c1.markDelete(pos5);
        e2 = c1.getNthEntry(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertNull((Object)e2);
    }

    @Test(timeOut=20000L)
    void testGetEntryAfterNWithIndividualDeletedMessages() throws Exception {
        ManagedLedger ledger = this.factory.open("testGetEnteryAfterNWithIndividualDeletedMessages");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position pos1 = ledger.addEntry("msg1".getBytes());
        Position pos2 = ledger.addEntry("msg2".getBytes());
        Position pos3 = ledger.addEntry("msg3".getBytes());
        Position pos4 = ledger.addEntry("msg4".getBytes());
        Position pos5 = ledger.addEntry("msg5".getBytes());
        c1.delete(pos3);
        c1.delete(pos4);
        Entry e = c1.getNthEntry(3, ManagedCursor.IndividualDeletedEntries.Exclude);
        Assert.assertEquals((byte[])e.getDataAndRelease(), (byte[])"msg5".getBytes());
        e = c1.getNthEntry(3, ManagedCursor.IndividualDeletedEntries.Include);
        Assert.assertEquals((byte[])e.getDataAndRelease(), (byte[])"msg3".getBytes());
    }

    @Test(timeOut=20000L)
    void cancelReadOperation() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor c1 = ledger.openCursor("c1");
        Assert.assertEquals((boolean)c1.cancelPendingReadRequest(), (boolean)false);
        final CountDownLatch counter = new CountDownLatch(1);
        c1.asyncReadEntriesOrWait(1, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                counter.countDown();
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        Assert.assertEquals((boolean)c1.cancelPendingReadRequest(), (boolean)true);
        final CountDownLatch counter2 = new CountDownLatch(1);
        c1.asyncReadEntriesOrWait(1, new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                counter2.countDown();
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                counter2.countDown();
            }
        }, null);
        ledger.addEntry("entry-1".getBytes(Encoding));
        Thread.sleep(100L);
        Assert.assertEquals((boolean)c1.cancelPendingReadRequest(), (boolean)false);
        counter2.await();
    }

    @Test(timeOut=20000L)
    public void testReopenMultipleTimes() throws Exception {
        ManagedLedger ledger = this.factory.open("testReopenMultipleTimes");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position mdPosition = c1.getMarkDeletedPosition();
        c1.close();
        ledger.close();
        ledger = this.factory.open("testReopenMultipleTimes");
        c1 = ledger.openCursor("c1");
        Assert.assertNotEquals((Object)c1.getMarkDeletedPosition(), (Object)mdPosition);
        c1.close();
        ledger.close();
        ledger = this.factory.open("testReopenMultipleTimes");
        c1 = ledger.openCursor("c1");
    }

    @Test(timeOut=20000L)
    public void testOutOfOrderDeletePersistenceWithClose() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig());
        ManagedCursor c1 = ledger.openCursor("c1");
        ArrayList<Position> addedPositions = new ArrayList<Position>();
        for (int i = 0; i < 20; ++i) {
            Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
            addedPositions.add(p);
        }
        c1.delete((Position)addedPositions.get(2));
        c1.delete((Position)addedPositions.get(5));
        c1.delete((Position)addedPositions.get(7));
        c1.delete((Position)addedPositions.get(8));
        c1.delete((Position)addedPositions.get(9));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)15L);
        ledger.close();
        this.factory.shutdown();
        this.factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig());
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)15L);
        List entries = c1.readEntries(20);
        Assert.assertEquals((int)entries.size(), (int)15);
        List entriesStr = entries.stream().map(e -> new String(e.getDataAndRelease(), Encoding)).collect(Collectors.toList());
        Assert.assertEquals((String)((String)entriesStr.get(0)), (String)"dummy-entry-0");
        Assert.assertEquals((String)((String)entriesStr.get(1)), (String)"dummy-entry-1");
        Assert.assertEquals((String)((String)entriesStr.get(2)), (String)"dummy-entry-3");
        Assert.assertEquals((String)((String)entriesStr.get(3)), (String)"dummy-entry-4");
        Assert.assertEquals((String)((String)entriesStr.get(4)), (String)"dummy-entry-6");
        Assert.assertFalse((boolean)c1.hasMoreEntries());
    }

    @Test(timeOut=20000L)
    public void testOutOfOrderDeletePersistenceAfterCrash() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig());
        ManagedCursor c1 = ledger.openCursor("c1");
        ArrayList<Position> addedPositions = new ArrayList<Position>();
        for (int i = 0; i < 20; ++i) {
            Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
            addedPositions.add(p);
        }
        c1.delete((Position)addedPositions.get(2));
        c1.delete((Position)addedPositions.get(5));
        c1.delete((Position)addedPositions.get(7));
        c1.delete((Position)addedPositions.get(8));
        c1.delete((Position)addedPositions.get(9));
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)15L);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger", new ManagedLedgerConfig());
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)15L);
        List entries = c1.readEntries(20);
        Assert.assertEquals((int)entries.size(), (int)15);
        List entriesStr = entries.stream().map(e -> new String(e.getDataAndRelease(), Encoding)).collect(Collectors.toList());
        Assert.assertEquals((String)((String)entriesStr.get(0)), (String)"dummy-entry-0");
        Assert.assertEquals((String)((String)entriesStr.get(1)), (String)"dummy-entry-1");
        Assert.assertEquals((String)((String)entriesStr.get(2)), (String)"dummy-entry-3");
        Assert.assertEquals((String)((String)entriesStr.get(3)), (String)"dummy-entry-4");
        Assert.assertEquals((String)((String)entriesStr.get(4)), (String)"dummy-entry-6");
        Assert.assertFalse((boolean)c1.hasMoreEntries());
        factory2.shutdown();
    }

    @Test(timeOut=5000L)
    public void testLeakFailedLedgerOfManageCursor() throws Exception {
        ManagedLedgerConfig mlConfig = new ManagedLedgerConfig();
        ManagedLedger ledger = this.factory.open("my_test_ledger", mlConfig);
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        final CountDownLatch latch = new CountDownLatch(1);
        c1.createNewMetadataLedger(new ManagedCursorImpl.VoidCallback(){

            public void operationComplete() {
                latch.countDown();
            }

            public void operationFailed(ManagedLedgerException exception) {
                latch.countDown();
            }
        });
        CountDownLatch latch1 = new CountDownLatch(1);
        String path = "/managed-ledgers/my_test_ledger/c1";
        this.zkc.setData(path, "".getBytes(), -1, (rc, path1, ctx, stat) -> latch1.countDown(), null);
        latch1.await();
        final CountDownLatch latch2 = new CountDownLatch(1);
        long ledgerId = 6L;
        c1.createNewMetadataLedger(new ManagedCursorImpl.VoidCallback(){

            public void operationComplete() {
                latch2.countDown();
            }

            public void operationFailed(ManagedLedgerException exception) {
                latch2.countDown();
            }
        });
        latch2.await();
        try {
            this.bkc.openLedgerNoRecovery(ledgerId, BookKeeper.DigestType.fromApiDigestType((DigestType)mlConfig.getDigestType()), mlConfig.getPassword());
            Assert.fail((String)"ledger should have deleted due to update-cursor failure");
        }
        catch (BKException bKException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    public void testOutOfOrderDeletePersistenceIntoLedgerWithClose() throws Exception {
        int totalAddEntries = 100;
        String ledgerName = "my_test_ledger";
        String cursorName = "c1";
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxUnackedRangesToPersistInZk(10);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open(ledgerName, managedLedgerConfig);
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor(cursorName);
        ArrayList<Position> addedPositions = new ArrayList<Position>();
        for (int i = 0; i < 100; ++i) {
            Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
            addedPositions.add(p);
            if (i % 2 != 0) continue;
            c1.delete((Position)addedPositions.get(i));
        }
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)50L);
        ledger.close();
        final CountDownLatch cursorLedgerLatch = new CountDownLatch(1);
        final AtomicLong cursorLedgerId = new AtomicLong(0L);
        ledger.getStore().asyncGetCursorInfo(ledger.getName(), cursorName, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>(){

            public void operationComplete(MLDataFormats.ManagedCursorInfo result, MetaStore.Stat stat) {
                cursorLedgerId.set(result.getCursorsLedgerId());
                cursorLedgerLatch.countDown();
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                cursorLedgerLatch.countDown();
            }
        });
        cursorLedgerLatch.await();
        Assert.assertEquals((long)cursorLedgerId.get(), (long)c1.getCursorLedger());
        CountDownLatch latch = new CountDownLatch(1);
        AtomicInteger individualDeletedMessagesCount = new AtomicInteger(0);
        this.bkc.asyncOpenLedger(c1.getCursorLedger(), BookKeeper.DigestType.CRC32C, "".getBytes(), (rc, lh, ctx) -> {
            if (rc == 0) {
                long lastEntry = lh.getLastAddConfirmed();
                lh.asyncReadEntries(lastEntry, lastEntry, (rc1, lh1, seq, ctx1) -> {
                    try {
                        LedgerEntry entry = (LedgerEntry)seq.nextElement();
                        MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom((byte[])entry.getEntry());
                        individualDeletedMessagesCount.set(positionInfo.getIndividualDeletedMessagesCount());
                    }
                    catch (Exception exception) {
                        // empty catch block
                    }
                    latch.countDown();
                }, null);
            } else {
                latch.countDown();
            }
        }, null);
        latch.await();
        Assert.assertEquals((int)individualDeletedMessagesCount.get(), (int)49);
        this.factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = (ManagedLedgerImpl)this.factory.open(ledgerName, managedLedgerConfig);
        c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)50L);
        List entries = c1.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)50);
    }

    @Test(timeOut=20000L)
    public void testOutOfOrderDeletePersistenceIntoZkWithClose() throws Exception {
        int totalAddEntries = 100;
        String ledgerName = "my_test_ledger_zk";
        String cursorName = "c1";
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open(ledgerName, managedLedgerConfig);
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor(cursorName);
        ArrayList<Position> addedPositions = new ArrayList<Position>();
        for (int i = 0; i < 100; ++i) {
            Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
            addedPositions.add(p);
            if (i % 2 != 0) continue;
            c1.delete((Position)addedPositions.get(i));
        }
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)50L);
        ledger.close();
        final CountDownLatch latch = new CountDownLatch(1);
        final AtomicInteger individualDeletedMessagesCount = new AtomicInteger(0);
        ledger.getStore().asyncGetCursorInfo(ledger.getName(), cursorName, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>(){

            public void operationComplete(MLDataFormats.ManagedCursorInfo result, MetaStore.Stat stat) {
                individualDeletedMessagesCount.set(result.getIndividualDeletedMessagesCount());
                latch.countDown();
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                latch.countDown();
            }
        });
        latch.await();
        Assert.assertEquals((int)individualDeletedMessagesCount.get(), (int)49);
        this.factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = (ManagedLedgerImpl)this.factory.open(ledgerName, managedLedgerConfig);
        c1 = (ManagedCursorImpl)ledger.openCursor(cursorName);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)50L);
        List entries = c1.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)50);
    }

    @Test
    public void testInvalidMarkDelete() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig());
        ManagedCursor cursor = ledger.openCursor("c1");
        Position readPosition = cursor.getReadPosition();
        Position markDeletePosition = cursor.getMarkDeletedPosition();
        ArrayList<Position> addedPositions = new ArrayList<Position>();
        for (int i = 0; i < 20; ++i) {
            Position p = ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
            addedPositions.add(p);
        }
        final CountDownLatch markDeleteCallbackLatch = new CountDownLatch(1);
        PositionImpl position = PositionImpl.get((long)100L, (long)100L);
        final AtomicBoolean markDeleteCallFailed = new AtomicBoolean(false);
        cursor.asyncMarkDelete((Position)position, new AsyncCallbacks.MarkDeleteCallback(){

            public void markDeleteComplete(Object ctx) {
                markDeleteCallbackLatch.countDown();
            }

            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                markDeleteCallFailed.set(true);
                markDeleteCallbackLatch.countDown();
            }
        }, null);
        markDeleteCallbackLatch.await();
        Assert.assertEquals((Object)readPosition, (Object)cursor.getReadPosition());
        Assert.assertEquals((Object)markDeletePosition, (Object)cursor.getMarkDeletedPosition());
        final CountDownLatch deleteCallbackLatch = new CountDownLatch(1);
        markDeleteCallFailed.set(false);
        cursor.asyncDelete((Position)position, new AsyncCallbacks.DeleteCallback(){

            public void deleteComplete(Object ctx) {
                deleteCallbackLatch.countDown();
            }

            public void deleteFailed(ManagedLedgerException exception, Object ctx) {
                markDeleteCallFailed.set(true);
                deleteCallbackLatch.countDown();
            }
        }, null);
        deleteCallbackLatch.await();
        Assert.assertEquals((Object)readPosition, (Object)cursor.getReadPosition());
        Assert.assertEquals((Object)markDeletePosition, (Object)cursor.getMarkDeletedPosition());
    }

    @Test
    public void testEstimatedUnackedSize() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig());
        ManagedCursor cursor = ledger.openCursor("c1");
        byte[] entryData = new byte[5];
        for (int i = 0; i < 4; ++i) {
            ledger.addEntry(entryData);
        }
        Position deleteAt = ledger.addEntry(entryData);
        for (int i = 0; i < 10; ++i) {
            ledger.addEntry(entryData);
        }
        Assert.assertEquals((long)cursor.getEstimatedSizeSinceMarkDeletePosition(), (long)(15 * entryData.length));
        cursor.markDelete(deleteAt);
        Assert.assertEquals((long)cursor.getEstimatedSizeSinceMarkDeletePosition(), (long)(10 * entryData.length));
    }

    @Test(timeOut=20000L)
    public void testRecoverCursorAheadOfLastPosition() throws Exception {
        String mlName = "my_test_ledger";
        final PositionImpl lastPosition = new PositionImpl(1L, 10L);
        final PositionImpl nextPosition = new PositionImpl(3L, -1L);
        String cursorName = "my_test_cursor";
        long cursorsLedgerId = -1L;
        long markDeleteLedgerId = 2L;
        long markDeleteEntryId = -1L;
        MetaStoreImplZookeeper mockMetaStore = (MetaStoreImplZookeeper)Mockito.mock(MetaStoreImplZookeeper.class);
        ((MetaStoreImplZookeeper)Mockito.doAnswer((Answer)new Answer<Object>(){

            public Object answer(InvocationOnMock invocation) {
                MLDataFormats.ManagedCursorInfo info = MLDataFormats.ManagedCursorInfo.newBuilder().setCursorsLedgerId(-1L).setMarkDeleteLedgerId(2L).setMarkDeleteEntryId(-1L).setLastActive(0L).build();
                MetaStore.Stat stat = (MetaStore.Stat)Mockito.mock(MetaStore.Stat.class);
                MetaStore.MetaStoreCallback callback = (MetaStore.MetaStoreCallback)invocation.getArguments()[2];
                callback.operationComplete((Object)info, stat);
                return null;
            }
        }).when((Object)mockMetaStore)).asyncGetCursorInfo((String)Mockito.eq((Object)"my_test_ledger"), (String)Mockito.eq((Object)"my_test_cursor"), (MetaStore.MetaStoreCallback)Mockito.any(MetaStore.MetaStoreCallback.class));
        ManagedLedgerImpl ml = (ManagedLedgerImpl)Mockito.mock(ManagedLedgerImpl.class);
        Mockito.when((Object)ml.getName()).thenReturn((Object)"my_test_ledger");
        Mockito.when((Object)ml.getStore()).thenReturn((Object)mockMetaStore);
        Mockito.when((Object)ml.getLastPosition()).thenReturn((Object)lastPosition);
        Mockito.when((Object)ml.getNextValidLedger(2L)).thenReturn((Object)3L);
        Mockito.when((Object)ml.getNextValidPosition(lastPosition)).thenReturn((Object)nextPosition);
        Mockito.when((Object)ml.ledgerExists(2L)).thenReturn((Object)false);
        BookKeeper mockBookKeeper = (BookKeeper)Mockito.mock(BookKeeper.class);
        final ManagedCursorImpl cursor = new ManagedCursorImpl(mockBookKeeper, new ManagedLedgerConfig(), ml, "my_test_cursor");
        cursor.recover(new ManagedCursorImpl.VoidCallback(){

            public void operationComplete() {
                Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)lastPosition);
                Assert.assertEquals((Object)cursor.getReadPosition(), (Object)nextPosition);
                Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
            }

            public void operationFailed(ManagedLedgerException exception) {
                Assert.fail((String)"Cursor recovery should not fail");
            }
        });
    }
}

