/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.Charsets;
import com.google.common.collect.Iterables;
import java.nio.charset.Charset;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class NonDurableCursorTest
extends MockedBookKeeperTestCase {
    private static final Charset Encoding = Charsets.UTF_8;
    private static final Logger log = LoggerFactory.getLogger(NonDurableCursorTest.class);

    @Test(timeOut=20000L)
    void readFromEmptyLedger() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.earliest);
        List entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        ledger.addEntry("test".getBytes(Encoding));
        entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        Assert.assertEquals((String)c1.toString(), (String)"NonDurableCursorImpl{ledger=my_test_ledger, ackPos=3:-1, readPos=3:1}");
    }

    @Test(timeOut=20000L)
    void testZNodeBypassed() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.earliest);
        Assert.assertTrue((boolean)Iterables.isEmpty((Iterable)ledger.getCursors()));
        c1.close();
        ledger.close();
        ManagedLedger ledger2 = this.factory.open("my_test_ledger");
        Assert.assertTrue((boolean)Iterables.isEmpty((Iterable)ledger2.getCursors()));
    }

    @Test(timeOut=20000L)
    void readTwice() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ManagedCursor c2 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void readWithCacheDisabled() throws Exception {
        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
        config.setMaxCacheSize(0L);
        this.factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle(), config);
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ManagedCursor c2 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"entry-1");
        Assert.assertEquals((String)new String(((Entry)entries.get(1)).getData(), Encoding), (String)"entry-2");
        entries.forEach(e -> e.release());
        entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = c2.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void readFromClosedLedger() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.close();
        try {
            c1.readEntries(2);
            Assert.fail((String)"ledger is closed, should fail");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    void testNumberOfEntries() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2).setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c2 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ManagedCursor c3 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ManagedCursor c4 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ManagedCursor c5 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)4L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c2.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((boolean)c2.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c3.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((boolean)c3.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c4.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((boolean)c4.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c5.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c5.hasMoreEntries(), (boolean)false);
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        c1.markDelete(((Entry)entries.get(1)).getPosition());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void testNumberOfEntriesInBacklog() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2).setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor c2 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ManagedCursor c3 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        ManagedCursor c4 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        ManagedCursor c5 = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        Assert.assertEquals((long)c2.getNumberOfEntriesInBacklog(), (long)3L);
        Assert.assertEquals((long)c3.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((long)c4.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((long)c5.getNumberOfEntriesInBacklog(), (long)0L);
        List entries = c1.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        c1.markDelete(p1);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        c1.delete(p3);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        c1.markDelete(p4);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
    }

    @Test(timeOut=20000L)
    void markDeleteWithErrors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        List entries = cursor.readEntries(100);
        this.stopBookKeeper();
        Assert.assertEquals((int)entries.size(), (int)1);
        cursor.markDelete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    void markDeleteAcrossLedgers() throws Exception {
        ManagedLedger ml1 = this.factory.open("my_test_ledger");
        ManagedCursor mc1 = ml1.openCursor("c1");
        ml1.close();
        mc1.close();
        this.factory.close(ml1);
        ManagedLedger ml2 = this.factory.open("my_test_ledger");
        ManagedCursor mc2 = ml2.openCursor("c1");
        Position pos = ml2.addEntry("dummy-entry-1".getBytes(Encoding));
        List entries = mc2.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"dummy-entry-1");
        entries.forEach(e -> e.release());
        mc2.delete(pos);
        Assert.assertEquals((Object)mc2.getMarkDeletedPosition(), (Object)pos);
        Assert.assertEquals((Object)mc2.getMarkDeletedPosition().getNext(), (Object)mc2.getReadPosition());
    }

    @Test(timeOut=20000L)
    void testResetCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_move_cursor_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
        ManagedCursor cursor = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl lastPosition = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        AtomicBoolean moveStatus = new AtomicBoolean(false);
        PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2L);
        try {
            cursor.resetCursor((Position)resetPosition);
            moveStatus.set(true);
        }
        catch (Exception e) {
            log.warn("error in reset cursor", e.getCause());
        }
        Assert.assertTrue((boolean)moveStatus.get());
        Assert.assertTrue((boolean)cursor.getReadPosition().equals(resetPosition));
        cursor.close();
        ledger.close();
    }

    @Test(timeOut=20000L)
    void testasyncResetCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_move_cursor_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
        ManagedCursor cursor = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl lastPosition = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        final AtomicBoolean moveStatus = new AtomicBoolean(false);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        PositionImpl resetPosition = new PositionImpl(lastPosition.getLedgerId(), lastPosition.getEntryId() - 2L);
        cursor.asyncResetCursor((Position)resetPosition, new AsyncCallbacks.ResetCursorCallback(){

            public void resetComplete(Object ctx) {
                moveStatus.set(true);
                countDownLatch.countDown();
            }

            public void resetFailed(ManagedLedgerException exception, Object ctx) {
                moveStatus.set(false);
                countDownLatch.countDown();
            }
        });
        countDownLatch.await();
        Assert.assertTrue((boolean)moveStatus.get());
        Assert.assertTrue((boolean)cursor.getReadPosition().equals(resetPosition));
        cursor.close();
        ledger.close();
    }

    @Test(timeOut=20000L)
    void rewind() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2).setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.earliest);
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Position p3 = ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        Position p4 = ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        log.debug("p1: {}", (Object)p1);
        log.debug("p2: {}", (Object)p2);
        log.debug("p3: {}", (Object)p3);
        log.debug("p4: {}", (Object)p4);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)4L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)4L);
        c1.markDelete(p1);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        List entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)3);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        c1.rewind();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)3L);
        c1.markDelete(p2);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)2L);
        c1.rewind();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
        c1.markDelete(p4);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)0L);
        c1.rewind();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        ledger.addEntry("dummy-entry-5".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        ledger.addEntry("dummy-entry-6".getBytes(Encoding));
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)2L);
    }

    @Test(timeOut=20000L)
    void markDeleteSkippingMessage() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(10));
        ManagedCursor cursor = ledger.newNonDurableCursor((Position)PositionImpl.earliest);
        Position p1 = ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
        PositionImpl p4 = (PositionImpl)ledger.addEntry("dummy-entry-4".getBytes(Encoding));
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)4L);
        cursor.markDelete(p1);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)3L);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)p2);
        List entries = cursor.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((String)new String(((Entry)entries.get(0)).getData(), Encoding), (String)"dummy-entry-2");
        entries.forEach(e -> e.release());
        cursor.markDelete((Position)p4);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((Object)cursor.getReadPosition(), (Object)new PositionImpl(p4.getLedgerId(), p4.getEntryId() + 1L));
    }

    @Test(timeOut=20000L)
    public void asyncMarkDeleteBlocking() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMetadataMaxEntriesPerLedger(5);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        final ManagedCursor c1 = ledger.openCursor("c1");
        final AtomicReference lastPosition = new AtomicReference();
        int N = 100;
        final CountDownLatch latch = new CountDownLatch(100);
        for (int i = 0; i < 100; ++i) {
            ledger.asyncAddEntry("entry".getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                }

                public void addComplete(Position position, Object ctx) {
                    lastPosition.set(position);
                    c1.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback(){

                        public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                        }

                        public void markDeleteComplete(Object ctx) {
                            latch.countDown();
                        }
                    }, null);
                }
            }, null);
        }
        latch.await();
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger");
        ManagedCursor c2 = ledger.openCursor("c1");
        Assert.assertEquals((Object)c2.getMarkDeletedPosition(), lastPosition.get());
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    void unorderedMarkDelete() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = ledger.addEntry("entry-1".getBytes(Encoding));
        Position p2 = ledger.addEntry("entry-2".getBytes(Encoding));
        c1.markDelete(p2);
        try {
            c1.markDelete(p1);
            Assert.fail((String)"Should have thrown exception");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)p2);
    }

    @Test(timeOut=20000L)
    void testSingleDelete() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(3).setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        ManagedCursor cursor = ledger.newNonDurableCursor((Position)PositionImpl.latest);
        Position p1 = ledger.addEntry("entry1".getBytes());
        Position p2 = ledger.addEntry("entry2".getBytes());
        Position p3 = ledger.addEntry("entry3".getBytes());
        Position p4 = ledger.addEntry("entry4".getBytes());
        Position p5 = ledger.addEntry("entry5".getBytes());
        Position p6 = ledger.addEntry("entry6".getBytes());
        Position p0 = cursor.getMarkDeletedPosition();
        cursor.delete(p4);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p0);
        cursor.delete(p1);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
        cursor.delete(p3);
        cursor.delete(p3);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
        cursor.delete(p2);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p4);
        cursor.delete(p5);
        Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p5);
        cursor.close();
        try {
            cursor.delete(p6);
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    void subscribeToEarliestPositionWithImmediateDeletion() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ledger.addEntry("entry-1".getBytes());
        ledger.addEntry("entry-2".getBytes());
        Position p3 = ledger.addEntry("entry-3".getBytes());
        Thread.sleep(300L);
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.earliest);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p3);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)new PositionImpl(5L, -1L));
    }

    @Test
    void subscribeToEarliestPositionWithDeferredDeletion() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(1, TimeUnit.HOURS).setRetentionSizeInMB(1L));
        Position p1 = ledger.addEntry("entry-1".getBytes());
        Position p2 = ledger.addEntry("entry-2".getBytes());
        ledger.addEntry("entry-3".getBytes());
        ledger.addEntry("entry-4".getBytes());
        ledger.addEntry("entry-5".getBytes());
        ledger.addEntry("entry-6".getBytes());
        ManagedCursor c1 = ledger.newNonDurableCursor((Position)PositionImpl.earliest);
        Assert.assertEquals((Object)c1.getReadPosition(), (Object)p1);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)new PositionImpl(3L, -1L));
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)6L);
        Assert.assertEquals((long)c1.getNumberOfEntriesInBacklog(), (long)6L);
        ManagedCursor c2 = ledger.newNonDurableCursor(p1);
        Assert.assertEquals((Object)c2.getReadPosition(), (Object)p2);
        Assert.assertEquals((Object)c2.getMarkDeletedPosition(), (Object)p1);
        Assert.assertEquals((long)c2.getNumberOfEntries(), (long)5L);
        Assert.assertEquals((long)c2.getNumberOfEntriesInBacklog(), (long)5L);
    }
}

