/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryCache;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.MetaStoreImplZookeeper;
import org.apache.bookkeeper.mledger.impl.OpReadEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.ByteBufPair;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class ManagedLedgerTest
extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);
    private static final Charset Encoding = Charsets.UTF_8;

    @Test
    public void managedLedgerApi() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        for (int i = 0; i < 100; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        while (cursor.hasMoreEntries()) {
            List entries = cursor.readEntries(20);
            log.debug("Read {} entries", (Object)entries.size());
            Entry lastEntry = (Entry)entries.get(entries.size() - 1);
            cursor.markDelete(lastEntry.getPosition());
            for (Entry entry : entries) {
                log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
                entry.release();
            }
            log.info("-----------------------");
        }
        log.info("Finished reading entries");
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void simple() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)"dummy-entry-1".getBytes(Encoding).length);
        ManagedCursor cursor = ledger.openCursor("c1");
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)0L);
        Assert.assertEquals((Collection)cursor.readEntries(100), new ArrayList());
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)1L);
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)0);
        ledger.close();
        this.factory.shutdown();
    }

    @Test(timeOut=20000L)
    public void closeAndReopen() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.close();
        log.info("Closing ledger and reopening");
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = factory2.open("my_test_ledger");
        cursor = ledger.openCursor("c1");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)("dummy-entry-1".getBytes(Encoding).length * 2));
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        ledger.close();
        factory2.shutdown();
    }

    @Test(timeOut=20000L)
    public void acknowledge1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        List entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)2L);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)2L);
        cursor.markDelete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        cursor = ledger.openCursor("c1");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)("dummy-entry-1".getBytes(Encoding).length * 2));
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(), (long)1L);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void asyncAPI() throws Throwable {
        final CountDownLatch counter = new CountDownLatch(1);
        this.factory.asyncOpen("my_test_ledger", new ManagedLedgerConfig(), new AsyncCallbacks.OpenLedgerCallback(){

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                ledger.asyncOpenCursor("test-cursor", new AsyncCallbacks.OpenCursorCallback(){

                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                        ManagedLedger ledger = (ManagedLedger)ctx;
                        ledger.asyncAddEntry("test".getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                            public void addComplete(Position position, Object ctx) {
                                Pair pair = (Pair)ctx;
                                ManagedLedger ledger = (ManagedLedger)pair.getLeft();
                                ManagedCursor cursor = (ManagedCursor)pair.getRight();
                                Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
                                Assert.assertEquals((long)ledger.getTotalSize(), (long)"test".getBytes(Encoding).length);
                                cursor.asyncReadEntries(2, new AsyncCallbacks.ReadEntriesCallback(){

                                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
                                        ManagedCursor cursor = (ManagedCursor)ctx;
                                        Assert.assertEquals((int)entries.size(), (int)1);
                                        Entry entry = entries.get(0);
                                        Position position = entry.getPosition();
                                        Assert.assertEquals((String)new String(entry.getDataAndRelease(), Encoding), (String)"test");
                                        log.debug("Mark-Deleting to position {}", (Object)position);
                                        cursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback(){

                                            public void markDeleteComplete(Object ctx) {
                                                log.debug("Mark delete complete");
                                                ManagedCursor cursor = (ManagedCursor)ctx;
                                                Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
                                                counter.countDown();
                                            }

                                            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                                                Assert.fail((String)exception.getMessage());
                                            }
                                        }, (Object)cursor);
                                    }

                                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                                        Assert.fail((String)exception.getMessage());
                                    }
                                }, (Object)cursor);
                            }

                            public void addFailed(ManagedLedgerException exception, Object ctx) {
                                Assert.fail((String)exception.getMessage());
                            }
                        }, (Object)Pair.of((Object)ledger, (Object)cursor));
                    }

                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                        Assert.fail((String)exception.getMessage());
                    }
                }, (Object)ledger);
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null);
        counter.await();
        log.info("Test completed");
    }

    @Test(timeOut=20000L)
    public void spanningMultipleLedgers() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("c1");
        for (int i = 0; i < 11; ++i) {
            ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
        }
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)11);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        PositionImpl first = (PositionImpl)((Entry)entries.get(0)).getPosition();
        PositionImpl last = (PositionImpl)((Entry)entries.get(entries.size() - 1)).getPosition();
        entries.forEach(e -> e.release());
        log.info("First={} Last={}", (Object)first, (Object)last);
        Assert.assertTrue((first.getLedgerId() < last.getLedgerId() ? 1 : 0) != 0);
        Assert.assertEquals((long)first.getEntryId(), (long)0L);
        Assert.assertEquals((long)last.getEntryId(), (long)0L);
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)0);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void spanningMultipleLedgersWithSize() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);
        config.setMaxSizePerLedgerMb(1);
        config.setEnsembleSize(1);
        config.setWriteQuorumSize(1).setAckQuorumSize(1);
        config.setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("c1");
        byte[] content = new byte[1047552];
        for (int i = 0; i < 3; ++i) {
            ledger.addEntry(content);
        }
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)3);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        PositionImpl first = (PositionImpl)((Entry)entries.get(0)).getPosition();
        PositionImpl last = (PositionImpl)((Entry)entries.get(entries.size() - 1)).getPosition();
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)0);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        entries.forEach(e -> e.release());
        log.info("First={} Last={}", (Object)first, (Object)last);
        Assert.assertTrue((first.getLedgerId() < last.getLedgerId() ? 1 : 0) != 0);
        Assert.assertEquals((long)first.getEntryId(), (long)0L);
        Assert.assertEquals((long)last.getEntryId(), (long)0L);
        ledger.close();
    }

    @Test(expectedExceptions={IllegalArgumentException.class})
    public void invalidReadEntriesArg1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("entry".getBytes());
        cursor.readEntries(-1);
        Assert.fail((String)"Should have thrown an exception in the above line");
    }

    @Test(expectedExceptions={IllegalArgumentException.class})
    public void invalidReadEntriesArg2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("entry".getBytes());
        cursor.readEntries(0);
        Assert.fail((String)"Should have thrown an exception in the above line");
    }

    @Test(timeOut=20000L)
    public void deleteAndReopen() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.delete();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void deleteAndReopenWithCursors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.delete();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("test-cursor");
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void asyncDeleteWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        this.stopZooKeeper();
        this.factory.open("my_test_ledger", new ManagedLedgerConfig()).asyncDelete(new AsyncCallbacks.DeleteLedgerCallback(){

            public void deleteLedgerComplete(Object ctx) {
                Assert.assertNull((Object)ctx);
                Assert.fail((String)"The async-call should have failed");
            }

            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void asyncAddEntryWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        final CountDownLatch counter = new CountDownLatch(1);
        ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, Object ctx) {
                Assert.assertNull((Object)ctx);
                counter.countDown();
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null);
        counter.await();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)"dummy-entry-1".getBytes(Encoding).length);
    }

    @Test(timeOut=20000L)
    public void doubleAsyncAddEntryWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        final CountDownLatch done = new CountDownLatch(10);
        for (int i = 0; i < 10; ++i) {
            final String content = "dummy-entry-" + i;
            ledger.asyncAddEntry(content.getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                public void addComplete(Position position, Object ctx) {
                    Assert.assertNotNull((Object)ctx);
                    log.info("Successfully added {}", (Object)content);
                    done.countDown();
                }

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    Assert.fail((String)exception.getMessage());
                }
            }, (Object)this);
        }
        done.await();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)10L);
    }

    @Test(timeOut=20000L)
    public void asyncAddEntryWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        this.stopZooKeeper();
        ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, Object ctx) {
                Assert.fail((String)"Should have failed");
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void asyncCloseWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        final CountDownLatch counter = new CountDownLatch(1);
        ledger.asyncClose(new AsyncCallbacks.CloseCallback(){

            public void closeComplete(Object ctx) {
                Assert.assertNull((Object)ctx);
                counter.countDown();
            }

            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void asyncOpenCursorWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        final CountDownLatch counter = new CountDownLatch(1);
        ledger.asyncOpenCursor("test-cursor", new AsyncCallbacks.OpenCursorCallback(){

            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                Assert.assertNull((Object)ctx);
                Assert.assertNotNull((Object)cursor);
                counter.countDown();
            }

            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void asyncOpenCursorWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        this.stopZooKeeper();
        ledger.asyncOpenCursor("test-cursor", new AsyncCallbacks.OpenCursorCallback(){

            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                Assert.fail((String)"The async-call should have failed");
            }

            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void readFromOlderLedger() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
    }

    @Test(timeOut=20000L)
    public void readFromOlderLedgers() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        cursor.readEntries(1).forEach(e -> e.release());
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        cursor.readEntries(1).forEach(e -> e.release());
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        cursor.readEntries(1).forEach(e -> e.release());
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)false);
    }

    @Test(timeOut=20000L)
    public void triggerLedgerDeletion() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        List entries = cursor.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)3L);
        entries.forEach(e -> e.release());
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        entries = cursor.readEntries(1);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        cursor.markDelete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    public void testEmptyManagedLedgerContent() throws Exception {
        ZooKeeper zk = this.bkc.getZkHandle();
        zk.create("/managed-ledger", new byte[0], (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        zk.create("/managed-ledger/my_test_ledger", " ".getBytes(), (List)ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
    }

    @Test(timeOut=20000L)
    public void testProducerAndNoConsumer() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ledger.addEntry("entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.addEntry("entry-2".getBytes(Encoding));
        while (ledger.getNumberOfEntries() > 1L) {
            log.debug("entries={}", (Object)ledger.getNumberOfEntries());
            Thread.sleep(100L);
        }
        ledger.addEntry("entry-3".getBytes(Encoding));
        while (ledger.getNumberOfEntries() > 1L) {
            log.debug("entries={}", (Object)ledger.getNumberOfEntries());
            Thread.sleep(100L);
        }
    }

    @Test(timeOut=20000L)
    public void testTrimmer() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("c1");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        ledger.addEntry("entry-4".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)4L);
        cursor.readEntries(1).forEach(e -> e.release());
        cursor.readEntries(1).forEach(e -> e.release());
        List entries = cursor.readEntries(1);
        Position lastPosition = ((Entry)entries.get(0)).getPosition();
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)4L);
        cursor.markDelete(lastPosition);
        while (ledger.getNumberOfEntries() != 2L) {
            Thread.sleep(10L);
        }
    }

    @Test(timeOut=20000L)
    public void testAsyncAddEntryAndSyncClose() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ledger.openCursor("c1");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        final CountDownLatch counter = new CountDownLatch(100);
        for (int i = 0; i < 100; ++i) {
            String content = "entry-" + i;
            ledger.asyncAddEntry(content.getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                public void addComplete(Position position, Object ctx) {
                    counter.countDown();
                }

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    Assert.fail((String)exception.getMessage());
                }
            }, null);
        }
        counter.await();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)100L);
    }

    @Test(timeOut=20000L)
    public void moveCursorToNextLedger() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        log.debug("Added 1st message");
        List entries = cursor.readEntries(1);
        log.debug("read message ok");
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        ledger.addEntry("entry-2".getBytes(Encoding));
        log.debug("Added 2nd message");
        ledger.addEntry("entry-3".getBytes(Encoding));
        log.debug("Added 3nd message");
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)2L);
        entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
    }

    @Test(timeOut=20000L)
    public void differentSessions() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)"dummy-entry-1".getBytes(Encoding).length);
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
        ledger.close();
        this.factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, (ZooKeeper)this.zkc);
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)"dummy-entry-1".getBytes(Encoding).length);
        cursor = ledger.openCursor("c1");
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)("dummy-entry-1".getBytes(Encoding).length * 2));
        Assert.assertEquals((boolean)cursor.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)2L);
        ledger.close();
    }

    @Test(enabled=false)
    public void fenceManagedLedger() throws Exception {
        ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedger ledger1 = factory1.open("my_test_ledger");
        ManagedCursor cursor1 = ledger1.openCursor("c1");
        ledger1.addEntry("entry-1".getBytes(Encoding));
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedger ledger2 = factory2.open("my_test_ledger");
        ManagedCursor cursor2 = ledger2.openCursor("c1");
        try {
            ledger1.addEntry("entry-1".getBytes(Encoding));
            Assert.fail((String)"Expecting exception");
        }
        catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
            // empty catch block
        }
        try {
            ledger1.addEntry("entry-2".getBytes(Encoding));
            Assert.fail((String)"Expecting exception");
        }
        catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
            // empty catch block
        }
        try {
            cursor1.readEntries(10);
            Assert.fail((String)"Expecting exception");
        }
        catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
            // empty catch block
        }
        try {
            ledger1.openCursor("new cursor");
            Assert.fail((String)"Expecting exception");
        }
        catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
            // empty catch block
        }
        ledger2.addEntry("entry-2".getBytes(Encoding));
        Assert.assertEquals((long)cursor2.getNumberOfEntries(), (long)2L);
        factory1.shutdown();
        factory2.shutdown();
    }

    @Test
    public void forceCloseLedgers() throws Exception {
        ManagedLedger ledger1 = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ledger1.openCursor("c1");
        ManagedCursor c2 = ledger1.openCursor("c2");
        ledger1.addEntry("entry-1".getBytes(Encoding));
        ledger1.addEntry("entry-2".getBytes(Encoding));
        ledger1.addEntry("entry-3".getBytes(Encoding));
        c2.readEntries(1).forEach(e -> e.release());
        c2.readEntries(1).forEach(e -> e.release());
        c2.readEntries(1).forEach(e -> e.release());
        ledger1.close();
        try {
            ledger1.addEntry("entry-3".getBytes(Encoding));
            Assert.fail((String)"should not have reached this point");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        try {
            ledger1.openCursor("new-cursor");
            Assert.fail((String)"should not have reached this point");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test
    public void closeLedgerWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("entry-1".getBytes(Encoding));
        this.stopZooKeeper();
        this.stopBookKeeper();
        try {
            ledger.close();
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    public void deleteWithErrors1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        PositionImpl position = (PositionImpl)ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        this.bkc.deleteLedger(position.getLedgerId());
        ledger.delete();
    }

    @Test(timeOut=20000L)
    public void deleteWithErrors2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        this.stopZooKeeper();
        try {
            ledger.delete();
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    public void readWithErrors1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        this.stopZooKeeper();
        this.stopBookKeeper();
        try {
            cursor.readEntries(10);
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        try {
            ledger.addEntry("dummy-entry-3".getBytes(Encoding));
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L, enabled=false)
    void concurrentAsyncOpen() throws Exception {
        final CountDownLatch counter = new CountDownLatch(2);
        class Result {
            ManagedLedger instance1 = null;
            ManagedLedger instance2 = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.factory.asyncOpen("my-test-ledger", new AsyncCallbacks.OpenLedgerCallback(){
            {
            }

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                result.instance1 = ledger;
                counter.countDown();
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        this.factory.asyncOpen("my-test-ledger", new AsyncCallbacks.OpenLedgerCallback(){
            {
            }

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                result.instance2 = ledger;
                counter.countDown();
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        counter.await();
        Assert.assertEquals((Object)result.instance1, (Object)result.instance2);
        Assert.assertNotNull((Object)result.instance1);
    }

    @Test
    public void asyncOpenClosedLedger() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my-closed-ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        c1.close();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.setFenced();
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedger instance1 = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.factory.asyncOpen("my-closed-ledger", new AsyncCallbacks.OpenLedgerCallback(){
            {
            }

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                result.instance1 = ledger;
                counter.countDown();
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        counter.await();
        Assert.assertNotNull((Object)result.instance1);
        ManagedCursor c2 = result.instance1.openCursor("c1");
        List entries = c2.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
    }

    @Test
    public void getCursors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ManagedCursor c2 = ledger.openCursor("c2");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)ledger.getCursors()), (Set)Sets.newHashSet((Object[])new ManagedCursor[]{c1, c2}));
        c1.close();
        ledger.deleteCursor("c1");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)ledger.getCursors()), (Set)Sets.newHashSet((Object[])new ManagedCursor[]{c2}));
        c2.close();
        ledger.deleteCursor("c2");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)ledger.getCursors()), (Set)Sets.newHashSet());
    }

    @Test
    public void ledgersList() throws Exception {
        MetaStore store = this.factory.getMetaStore();
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet());
        ManagedLedger ledger1 = this.factory.open("ledger1");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet((Object[])new String[]{"ledger1"}));
        ManagedLedger ledger2 = this.factory.open("ledger2");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet((Object[])new String[]{"ledger1", "ledger2"}));
        ledger1.delete();
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet((Object[])new String[]{"ledger2"}));
        ledger2.delete();
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet());
    }

    @Test
    public void testCleanup() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes(Encoding));
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)2);
        ledger.delete();
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)0);
    }

    @Test(timeOut=20000L)
    public void testAsyncCleanup() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes(Encoding));
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)2);
        final CountDownLatch latch = new CountDownLatch(1);
        ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback(){

            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)"should have succeeded");
            }

            public void deleteLedgerComplete(Object ctx) {
                latch.countDown();
            }
        }, null);
        latch.await();
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)0);
    }

    @Test(timeOut=20000L)
    public void testReopenAndCleanup() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes(Encoding));
        ledger.close();
        Thread.sleep(100L);
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)1);
        this.factory.shutdown();
        this.factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        Thread.sleep(100L);
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)2);
        ledger.close();
        this.factory.open("my_test_ledger", new ManagedLedgerConfig()).delete();
        Thread.sleep(100L);
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)0);
        this.factory.shutdown();
    }

    @Test(timeOut=20000L)
    public void doubleOpen() throws Exception {
        ManagedLedger ledger2;
        ManagedLedger ledger1 = this.factory.open("my_test_ledger");
        Assert.assertTrue((ledger1 == (ledger2 = this.factory.open("my_test_ledger")) ? 1 : 0) != 0);
    }

    @Test
    public void compositeNames() throws Exception {
        this.factory.open("my/test/ledger");
    }

    @Test
    public void previousPosition() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor cursor = ledger.openCursor("my_cursor");
        Position p0 = cursor.getMarkDeletedPosition();
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p0), (Object)p0);
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        PositionImpl pBeforeWriting = ledger.getLastPosition();
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry".getBytes());
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        Position p2 = ledger.addEntry("entry".getBytes());
        Position p3 = ledger.addEntry("entry".getBytes());
        Position p4 = ledger.addEntry("entry".getBytes());
        Assert.assertEquals((Object)ledger.getPreviousPosition(p1), (Object)pBeforeWriting);
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p2), (Object)p1);
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p3), (Object)p2);
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p4), (Object)p3);
    }

    @Test(timeOut=20000L)
    public void testOpenRaceCondition() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        final ManagedLedger ledger = this.factory.open("my-ledger", config);
        final ManagedCursor c1 = ledger.openCursor("c1");
        int N = 1000;
        final Position position = ledger.addEntry("entry-0".getBytes());
        ExecutorService executor = Executors.newCachedThreadPool();
        final CountDownLatch counter = new CountDownLatch(2);
        executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    for (int i = 0; i < 1000; ++i) {
                        c1.markDelete(position);
                    }
                    counter.countDown();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        executor.execute(new Runnable(){

            @Override
            public void run() {
                try {
                    for (int i = 0; i < 1000; ++i) {
                        ledger.openCursor("cursor-" + i);
                    }
                    counter.countDown();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        counter.await();
    }

    @Test
    public void invalidateConsumedEntriesFromCache() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        EntryCacheManager cacheManager = this.factory.getEntryCacheManager();
        EntryCache entryCache = ledger.entryCache;
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ManagedCursorImpl c2 = (ManagedCursorImpl)ledger.openCursor("c2");
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry-1".getBytes());
        PositionImpl p2 = (PositionImpl)ledger.addEntry("entry-2".getBytes());
        PositionImpl p3 = (PositionImpl)ledger.addEntry("entry-3".getBytes());
        PositionImpl p4 = (PositionImpl)ledger.addEntry("entry-4".getBytes());
        Assert.assertEquals((long)entryCache.getSize(), (long)28L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c2.setReadPosition((Position)p3);
        ledger.discardEntriesFromCache(c2, p2);
        Assert.assertEquals((long)entryCache.getSize(), (long)28L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c1.setReadPosition((Position)p2);
        ledger.discardEntriesFromCache(c1, p1);
        Assert.assertEquals((long)entryCache.getSize(), (long)21L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c1.setReadPosition((Position)p3);
        ledger.discardEntriesFromCache(c1, p2);
        Assert.assertEquals((long)entryCache.getSize(), (long)14L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        ledger.deactivateCursor((ManagedCursor)c1);
        Assert.assertEquals((long)entryCache.getSize(), (long)14L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c2.setReadPosition((Position)p4);
        ledger.discardEntriesFromCache(c2, p3);
        Assert.assertEquals((long)entryCache.getSize(), (long)7L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        ledger.deactivateCursor((ManagedCursor)c2);
        Assert.assertEquals((long)entryCache.getSize(), (long)0L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
    }

    @Test
    public void discardEmptyLedgersOnClose() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("entry".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
    }

    @Test
    public void discardEmptyLedgersOnError() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        this.bkc.failNow(-3);
        this.zkc.failNow(KeeperException.Code.CONNECTIONLOSS);
        try {
            ledger.addEntry("entry".getBytes());
            Assert.fail((String)"Should have received exception");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)0);
        try {
            ledger.addEntry("entry".getBytes());
            Assert.fail((String)"Should have received exception");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)0);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
    }

    @Test
    public void cursorReadsWithDiscardedEmptyLedgers() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = c1.getReadPosition();
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)false);
        ledger.addEntry("entry".getBytes());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        List entries = c1.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)false);
        Assert.assertEquals((int)c1.readEntries(1).size(), (int)0);
        c1.seek(p1);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        entries = c1.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        Assert.assertEquals((int)c1.readEntries(1).size(), (int)0);
    }

    @Test
    public void cursorReadsWithDiscardedEmptyLedgersStillListed() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-1".getBytes());
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-2".getBytes());
        final MLDataFormats.ManagedLedgerInfo.LedgerInfo l1info = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0);
        final MLDataFormats.ManagedLedgerInfo.LedgerInfo l2info = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1);
        ledger.close();
        final CountDownLatch counter = new CountDownLatch(1);
        final MetaStore store = this.factory.getMetaStore();
        store.getManagedLedgerInfo("my_test_ledger", false, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>(){

            public void operationComplete(MLDataFormats.ManagedLedgerInfo result, MetaStore.Stat version) {
                MLDataFormats.ManagedLedgerInfo.Builder info = MLDataFormats.ManagedLedgerInfo.newBuilder((MLDataFormats.ManagedLedgerInfo)result);
                info.clearLedgerInfo();
                info.addLedgerInfo(MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(l1info.getLedgerId()).build());
                info.addLedgerInfo(l2info);
                store.asyncUpdateLedgerIds("my_test_ledger", info.build(), version, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<Void>(){

                    public void operationComplete(Void result, MetaStore.Stat version) {
                        counter.countDown();
                    }

                    public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                        counter.countDown();
                    }
                });
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                counter.countDown();
            }
        });
        counter.await();
        this.bkc.deleteLedger(l1info.getLedgerId());
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)true);
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        List entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        Assert.assertEquals((boolean)c1.hasMoreEntries(), (boolean)false);
        entries = c1.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
    }

    @Test
    public void addEntryWithOffset() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("012345678".getBytes(), 2, 3);
        List entries = c1.readEntries(1);
        Assert.assertEquals((int)((Entry)entries.get(0)).getLength(), (int)3);
        Entry entry = (Entry)entries.get(0);
        Assert.assertEquals((String)new String(entry.getData()), (String)"234");
        entry.release();
    }

    @Test
    public void totalSizeTest() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", conf);
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry(new byte[10], 1, 8);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)8L);
        PositionImpl p2 = (PositionImpl)ledger.addEntry(new byte[12], 2, 5);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)13L);
        c1.markDelete((Position)new PositionImpl(p2.getLedgerId(), -1L));
        Thread.sleep(400L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)5L);
    }

    @Test
    public void testMinimumRolloverTime() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        conf.setMinimumRolloverTime(1, TimeUnit.SECONDS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", conf);
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Thread.sleep(1000L);
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
    }

    @Test
    public void testMaximumRolloverTime() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(5);
        conf.setMinimumRolloverTime(1, TimeUnit.SECONDS);
        conf.setMaximumRolloverTime(1, TimeUnit.SECONDS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_maxtime_ledger", conf);
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Thread.sleep(2000L);
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
    }

    @Test
    public void testRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(10L);
        config.setMaxEntriesPerLedger(1);
        config.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
        ManagedCursor c1 = ml.openCursor("c1");
        ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        ml.close();
        ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
        c1 = ml.openCursor("c1");
        ml.addEntry("shortmessage".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        ml.close();
        Assert.assertTrue((ml.getLedgersInfoAsList().size() > 1 ? 1 : 0) != 0);
        Assert.assertTrue((ml.getTotalSize() > (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
    }

    @Test(enabled=true)
    public void testNoRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(0L);
        config.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("noretention_test_ledger", config);
        ManagedCursor c1 = ml.openCursor("c1noretention");
        ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        ml.close();
        ml = (ManagedLedgerImpl)factory.open("noretention_test_ledger", config);
        c1 = ml.openCursor("c1noretention");
        ml.addEntry("shortmessage".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Thread.sleep(1000L);
        ml.close();
        Assert.assertTrue((ml.getLedgersInfoAsList().size() <= 1 ? 1 : 0) != 0);
        Assert.assertTrue((ml.getTotalSize() <= (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
    }

    @Test
    public void testDeletionAfterRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(0L);
        config.setMaxEntriesPerLedger(1);
        config.setRetentionTime(1, TimeUnit.SECONDS);
        ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
        ManagedCursor c1 = ml.openCursor("c1noretention");
        ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        ml.close();
        ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
        c1 = ml.openCursor("c1noretention");
        ml.addEntry("shortmessage".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        Thread.sleep(1000L);
        ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
        Assert.assertTrue((ml.getLedgersInfoAsList().size() <= 1 ? 1 : 0) != 0);
        Assert.assertTrue((ml.getTotalSize() <= (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
        ml.close();
    }

    @Test
    public void testInfiniteRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(-1L);
        config.setRetentionTime(-1, TimeUnit.HOURS);
        config.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
        ManagedCursor c1 = ml.openCursor("c1");
        ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        ml.close();
        ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
        c1 = ml.openCursor("c1");
        ml.addEntry("shortmessage".getBytes());
        c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
        ml.close();
        Assert.assertTrue((ml.getLedgersInfoAsList().size() > 1 ? 1 : 0) != 0);
        Assert.assertTrue((ml.getTotalSize() > (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
    }

    @Test
    public void testTimestampOnWorkingLedger() throws Exception {
        MLDataFormats.ManagedLedgerInfo.LedgerInfo i;
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        conf.setRetentionSizeInMB(10L);
        conf.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("my_test_ledger", conf);
        ml.openCursor("c1");
        ml.addEntry("msg1".getBytes());
        Iterator iter = ml.getLedgersInfoAsList().iterator();
        long ts = -1L;
        while (iter.hasNext()) {
            i = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)iter.next();
            if (iter.hasNext()) {
                Assert.assertTrue((ts <= i.getTimestamp() ? 1 : 0) != 0, (String)i.toString());
                ts = i.getTimestamp();
                continue;
            }
            Assert.assertTrue((i.getTimestamp() == 0L || ts <= i.getTimestamp() ? 1 : 0) != 0, (String)i.toString());
        }
        ml.addEntry("msg02".getBytes());
        ml.close();
        iter = ml.getLedgersInfoAsList().iterator();
        ts = -1L;
        while (iter.hasNext()) {
            i = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)iter.next();
            if (iter.hasNext()) {
                Assert.assertTrue((ts <= i.getTimestamp() ? 1 : 0) != 0, (String)i.toString());
                ts = i.getTimestamp();
                continue;
            }
            Assert.assertTrue((i.getTimestamp() > 0L ? 1 : 0) != 0, (String)"well closed LedgerInfo should set a timestamp > 0");
        }
    }

    @Test
    public void testBackwardCompatiblityForMeta() throws Exception {
        final MLDataFormats.ManagedLedgerInfo[] storedMLInfo = new MLDataFormats.ManagedLedgerInfo[3];
        final MetaStore.Stat[] versions = new MetaStore.Stat[1];
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((BookKeeper)this.bkc, this.bkc.getZkHandle());
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        conf.setRetentionSizeInMB(10L);
        conf.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger ml = factory.open("backward_test_ledger", conf);
        ml.openCursor("c1");
        ml.addEntry("msg1".getBytes());
        ml.addEntry("msg2".getBytes());
        ml.close();
        MetaStoreImplZookeeper store = new MetaStoreImplZookeeper((ZooKeeper)this.zkc, (OrderedExecutor)this.executor);
        final CountDownLatch l1 = new CountDownLatch(1);
        store.getManagedLedgerInfo("backward_test_ledger", false, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>(){

            public void operationComplete(MLDataFormats.ManagedLedgerInfo result, MetaStore.Stat version) {
                storedMLInfo[0] = result;
                versions[0] = version;
                l1.countDown();
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                Assert.fail((String)"on get ManagedLedgerInfo backward_test_ledger");
            }
        });
        l1.await();
        MLDataFormats.ManagedLedgerInfo.Builder builder1 = MLDataFormats.ManagedLedgerInfo.newBuilder();
        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo info : storedMLInfo[0].getLedgerInfoList()) {
            MLDataFormats.ManagedLedgerInfo.LedgerInfo noTimestamp = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().mergeFrom(info).clearTimestamp().build();
            Assert.assertFalse((boolean)noTimestamp.hasTimestamp(), (String)"expected old version info with no timestamp");
            builder1.addLedgerInfo(noTimestamp);
        }
        storedMLInfo[1] = builder1.build();
        final CountDownLatch l2 = new CountDownLatch(1);
        store.asyncUpdateLedgerIds("backward_test_ledger", storedMLInfo[1], versions[0], (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<Void>(){

            public void operationComplete(Void result, MetaStore.Stat version) {
                l2.countDown();
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                Assert.fail((String)"on asyncUpdateLedgerIds");
            }
        });
        ManagedLedgerImpl newVersionLedger = (ManagedLedgerImpl)factory.open("backward_test_ledger", conf);
        List mlInfo = newVersionLedger.getLedgersInfoAsList();
        Assert.assertTrue((boolean)mlInfo.stream().allMatch(new Predicate<MLDataFormats.ManagedLedgerInfo.LedgerInfo>(){

            @Override
            public boolean test(MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo) {
                return ledgerInfo.hasTimestamp();
            }
        }));
    }

    @Test
    public void testEstimatedBacklogSize() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testEstimatedBacklogSize");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry(new byte[1024]);
        Position position2 = ledger.addEntry(new byte[1024]);
        ledger.addEntry(new byte[1024]);
        ledger.addEntry(new byte[1024]);
        Position lastPosition = ledger.addEntry(new byte[1024]);
        long backlog = ledger.getEstimatedBacklogSize();
        Assert.assertEquals((long)backlog, (long)5120L);
        List entries = c1.readEntries(2);
        entries.forEach(Entry::release);
        c1.markDelete(position2);
        backlog = ledger.getEstimatedBacklogSize();
        Assert.assertEquals((long)backlog, (long)3072L);
        entries = c1.readEntries(3);
        entries.forEach(Entry::release);
        c1.markDelete(lastPosition);
        backlog = ledger.getEstimatedBacklogSize();
        Assert.assertEquals((long)backlog, (long)0L);
    }

    @Test
    public void testGetNextValidPosition() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testGetNextValidPosition", conf);
        ManagedCursor c1 = ledger.openCursor("c1");
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry1".getBytes());
        PositionImpl p2 = (PositionImpl)ledger.addEntry("entry2".getBytes());
        PositionImpl p3 = (PositionImpl)ledger.addEntry("entry3".getBytes());
        Assert.assertEquals((Object)ledger.getNextValidPosition((PositionImpl)c1.getMarkDeletedPosition()), (Object)p1);
        Assert.assertEquals((Object)ledger.getNextValidPosition(p1), (Object)p2);
        Assert.assertEquals((Object)ledger.getNextValidPosition(p3), (Object)PositionImpl.get((long)p3.getLedgerId(), (long)(p3.getEntryId() + 1L)));
    }

    @Test
    public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("cache_eviction_ledger");
        ManagedCursor cursor1 = ledger.openCursor("c1");
        ManagedCursor cursor2 = ledger.openCursor("c2");
        HashSet activeCursors = Sets.newHashSet();
        activeCursors.add(cursor1);
        activeCursors.add(cursor2);
        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        cacheField.setAccessible(true);
        EntryCacheImpl entryCache = (EntryCacheImpl)cacheField.get(ledger);
        Iterator activeCursor = ledger.getActiveCursors().iterator();
        activeCursors.remove(activeCursor.next());
        activeCursors.remove(activeCursor.next());
        Assert.assertTrue((boolean)activeCursors.isEmpty());
        Assert.assertFalse((boolean)activeCursor.hasNext());
        int totalInsertedEntries = 50;
        for (int i = 0; i < 50; ++i) {
            String content = "entry";
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((long)250L, (long)entryCache.getSize());
        int readEntries = 20;
        List entries1 = cursor1.readEntries(20);
        cursor1.markDelete(((Entry)entries1.get(entries1.size() - 1)).getPosition());
        for (Object entry : entries1) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        Thread.sleep(1000L);
        List entries2 = cursor2.readEntries(20);
        cursor2.markDelete(((Entry)entries2.get(entries2.size() - 1)).getPosition());
        for (Entry entry : entries2) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        log.info("expected, found : {}, {}", (Object)150, (Object)entryCache.getSize());
        Assert.assertEquals((long)150L, (long)entryCache.getSize());
        int remainingEntries = 30;
        entries1 = cursor1.readEntries(30);
        cursor1.markDelete(((Entry)entries1.get(entries1.size() - 1)).getPosition());
        for (Entry entry : entries1) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        Assert.assertEquals((long)150L, (long)entryCache.getSize());
        ledger.deactivateCursor(cursor2);
        Assert.assertEquals((long)0L, (long)entryCache.getSize());
        log.info("Finished reading entries");
        ledger.close();
    }

    @Test
    public void testActiveDeactiveCursor() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("cache_eviction_ledger");
        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        cacheField.setAccessible(true);
        EntryCacheImpl entryCache = (EntryCacheImpl)cacheField.get(ledger);
        int totalInsertedEntries = 20;
        for (int i = 0; i < 20; ++i) {
            String content = "entry";
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((long)0L, (long)entryCache.getSize());
        ManagedCursor cursor1 = ledger.openCursor("c1");
        ManagedCursor cursor2 = ledger.openCursor("c2");
        ledger.deactivateCursor(cursor2);
        for (int i = 0; i < 20; ++i) {
            String content = "entry";
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((long)100L, (long)entryCache.getSize());
        List entries1 = cursor1.readEntries(20);
        for (Entry entry : entries1) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        Assert.assertEquals((long)0L, (long)entryCache.getSize());
        ledger.close();
    }

    @Test
    public void testCursorRecoveryForEmptyLedgers() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testCursorRecoveryForEmptyLedgers");
        ManagedCursor c1 = ledger.openCursor("c1");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)ledger.lastConfirmedEntry);
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("testCursorRecoveryForEmptyLedgers");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)ledger.lastConfirmedEntry);
    }

    @Test
    public void testBacklogCursor() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("cache_backlog_ledger");
        long maxMessageCacheRetentionTimeMillis = 100L;
        Field field = ManagedLedgerImpl.class.getDeclaredField("maxMessageCacheRetentionTimeMillis");
        field.setAccessible(true);
        Field modifiersField = Field.class.getDeclaredField("modifiers");
        modifiersField.setAccessible(true);
        modifiersField.setInt(field, field.getModifiers() & 0xFFFFFFEF);
        field.set(ledger, 100L);
        Field backlogThresholdField = ManagedLedgerImpl.class.getDeclaredField("maxActiveCursorBacklogEntries");
        backlogThresholdField.setAccessible(true);
        long maxActiveCursorBacklogEntries = (Long)backlogThresholdField.get(ledger);
        ManagedCursor cursor1 = ledger.openCursor("c1");
        ManagedCursor cursor2 = ledger.openCursor("c2");
        int totalBacklogSizeEntries = (int)maxActiveCursorBacklogEntries;
        final CountDownLatch latch = new CountDownLatch(totalBacklogSizeEntries);
        for (int i = 0; i < totalBacklogSizeEntries + 1; ++i) {
            String content = "entry";
            final ByteBuf entry = this.getMessageWithMetadata(content.getBytes());
            ledger.asyncAddEntry(entry, new AsyncCallbacks.AddEntryCallback(){

                public void addComplete(Position position, Object ctx) {
                    latch.countDown();
                    entry.release();
                }

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                    entry.release();
                }
            }, null);
        }
        latch.await();
        Assert.assertTrue((boolean)cursor1.isActive());
        Assert.assertTrue((boolean)cursor2.isActive());
        Thread.sleep(200L);
        ledger.checkBackloggedCursors();
        Thread.sleep(100L);
        Assert.assertFalse((boolean)cursor1.isActive());
        Assert.assertFalse((boolean)cursor2.isActive());
        List entries1 = cursor1.readEntries(50);
        for (final ByteBuf entry : entries1) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        ledger.checkBackloggedCursors();
        Assert.assertTrue((boolean)cursor1.isActive());
        Assert.assertFalse((boolean)cursor2.isActive());
        ledger.close();
    }

    @Test
    public void testConcurrentOpenCursor() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testConcurrentOpenCursor");
        final AtomicReference<Object> cursor1 = new AtomicReference<Object>(null);
        final AtomicReference<Object> cursor2 = new AtomicReference<Object>(null);
        CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch latch = new CountDownLatch(2);
        this.cachedExecutor.execute(() -> {
            try {
                barrier.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            ledger.asyncOpenCursor("c1", new AsyncCallbacks.OpenCursorCallback(){

                public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                }

                public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                    cursor1.set(cursor);
                    latch.countDown();
                }
            }, null);
        });
        this.cachedExecutor.execute(() -> {
            try {
                barrier.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            ledger.asyncOpenCursor("c1", new AsyncCallbacks.OpenCursorCallback(){

                public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                }

                public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                    cursor2.set(cursor);
                    latch.countDown();
                }
            }, null);
        });
        latch.await();
        Assert.assertNotNull(cursor1.get());
        Assert.assertNotNull(cursor2.get());
        Assert.assertEquals(cursor1.get(), cursor2.get());
        ledger.close();
    }

    public ByteBuf getMessageWithMetadata(byte[] data) throws IOException {
        PulsarApi.MessageMetadata messageData = PulsarApi.MessageMetadata.newBuilder().setPublishTime(System.currentTimeMillis()).setProducerName("prod-name").setSequenceId(0L).build();
        ByteBuf payload = Unpooled.wrappedBuffer((byte[])data, (int)0, (int)data.length);
        int msgMetadataSize = messageData.getSerializedSize();
        int headersSize = 4 + msgMetadataSize;
        ByteBuf headers = PooledByteBufAllocator.DEFAULT.buffer(headersSize, headersSize);
        ByteBufCodedOutputStream outStream = ByteBufCodedOutputStream.get((ByteBuf)headers);
        headers.writeInt(msgMetadataSize);
        messageData.writeTo(outStream);
        outStream.recycle();
        return ByteBufPair.coalesce((ByteBufPair)ByteBufPair.get((ByteBuf)headers, (ByteBuf)payload));
    }

    @Test
    public void testConsumerSubscriptionInitializePosition() throws Exception {
        int MAX_ENTRY_PER_LEDGER = 2;
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(2);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("lastest_earliest_ledger", config);
        int totalInsertedEntries = 20;
        for (int i = 0; i < 20; ++i) {
            String content = "entry" + i;
            ledger.addEntry(content.getBytes());
        }
        ManagedCursor latestCursor = ledger.openCursor("c1", PulsarApi.CommandSubscribe.InitialPosition.Latest);
        ManagedCursor earliestCursor = ledger.openCursor("c2", PulsarApi.CommandSubscribe.InitialPosition.Earliest);
        PositionImpl p1 = (PositionImpl)latestCursor.getReadPosition();
        PositionImpl p2 = (PositionImpl)earliestCursor.getReadPosition();
        Pair latestPositionAndCounter = ledger.getLastPositionAndCounter();
        Pair earliestPositionAndCounter = ledger.getFirstPositionAndCounter();
        Assert.assertEquals((Object)((PositionImpl)latestPositionAndCounter.getLeft()).getNext(), (Object)p1);
        Assert.assertEquals((Object)((PositionImpl)earliestPositionAndCounter.getLeft()).getNext(), (Object)p2);
        Assert.assertEquals((long)((Long)latestPositionAndCounter.getRight()), (long)20L);
        Assert.assertEquals((long)((Long)earliestPositionAndCounter.getRight()), (long)(20L - earliestCursor.getNumberOfEntriesInBacklog()));
        ledger.close();
    }

    @Test
    public void testManagedLedgerAutoCreate() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(true);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("test", config);
        Assert.assertNotNull((Object)ledger);
    }

    @Test
    public void testManagedLedgerWithoutAutoCreate() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(false);
        try {
            this.factory.open("testManagedLedgerWithoutAutoCreate", config);
            Assert.fail((String)"should have thrown ManagedLedgerNotFoundException");
        }
        catch (ManagedLedgerException.ManagedLedgerNotFoundException managedLedgerNotFoundException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)this.factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate"));
    }

    @Test
    public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMetadataOperationsTimeoutSeconds(3L);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("timeout_ledger_test", config);
        BookKeeper bk = (BookKeeper)Mockito.mock(BookKeeper.class);
        ((BookKeeper)Mockito.doNothing().when((Object)bk)).asyncCreateLedger(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), (BookKeeper.DigestType)Matchers.any(), (byte[])Matchers.any(), (AsyncCallback.CreateCallback)Matchers.any(), Matchers.any(), (Map)Matchers.any());
        final AtomicInteger response = new AtomicInteger(0);
        final CountDownLatch latch = new CountDownLatch(1);
        ledger.asyncCreateLedger(bk, config, null, new AsyncCallback.CreateCallback(){

            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
                response.set(rc);
                latch.countDown();
            }
        }, Collections.emptyMap());
        latch.await(config.getMetadataOperationsTimeoutSeconds() + 2L, TimeUnit.SECONDS);
        Assert.assertEquals((int)response.get(), (int)-23);
        ledger.close();
    }

    @Test
    public void testManagedLedgerWithReadEntryTimeOut() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setReadEntryTimeoutSeconds(1L);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("timeout_ledger_test", config);
        BookKeeper bk = (BookKeeper)Mockito.mock(BookKeeper.class);
        ((BookKeeper)Mockito.doNothing().when((Object)bk)).asyncCreateLedger(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), (BookKeeper.DigestType)Matchers.any(), (byte[])Matchers.any(), (AsyncCallback.CreateCallback)Matchers.any(), Matchers.any(), (Map)Matchers.any());
        final AtomicReference responseException1 = new AtomicReference();
        final CountDownLatch latch1 = new CountDownLatch(1);
        CompletableFuture entriesFuture = new CompletableFuture();
        ReadHandle ledgerHandle = (ReadHandle)Mockito.mock(ReadHandle.class);
        ((ReadHandle)Mockito.doReturn(entriesFuture).when((Object)ledgerHandle)).readAsync(PositionImpl.earliest.getLedgerId(), PositionImpl.earliest.getEntryId());
        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                responseException1.set(null);
                latch1.countDown();
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                responseException1.set(exception);
                latch1.countDown();
            }
        }, null);
        ledger.asyncCreateLedger(bk, config, null, new AsyncCallback.CreateCallback(){

            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
            }
        }, Collections.emptyMap());
        latch1.await(config.getReadEntryTimeoutSeconds() + 2L, TimeUnit.SECONDS);
        Assert.assertNotNull(responseException1.get());
        Assert.assertEquals((String)((ManagedLedgerException)((Object)responseException1.get())).getMessage(), (String)BKException.getMessage((int)-23));
        final CountDownLatch latch2 = new CountDownLatch(1);
        final AtomicReference responseException2 = new AtomicReference();
        PositionImpl readPositionRef = PositionImpl.earliest;
        ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
        OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl)cursor, (PositionImpl)readPositionRef, (int)1, (AsyncCallbacks.ReadEntriesCallback)new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                latch2.countDown();
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                responseException2.set(exception);
                latch2.countDown();
            }
        }, null);
        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(), false, opReadEntry, null);
        latch2.await(config.getReadEntryTimeoutSeconds() + 2L, TimeUnit.SECONDS);
        Assert.assertNotNull(responseException2.get());
        Assert.assertEquals((String)((ManagedLedgerException)((Object)responseException2.get())).getMessage(), (String)BKException.getMessage((int)-23));
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void testManagedLedgerWithAddEntryTimeOut() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(1L);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("timeout_ledger_test", config);
        BookKeeper bk = (BookKeeper)Mockito.mock(BookKeeper.class);
        ((BookKeeper)Mockito.doNothing().when((Object)bk)).asyncCreateLedger(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt(), (BookKeeper.DigestType)Matchers.any(), (byte[])Matchers.any(), (AsyncCallback.CreateCallback)Matchers.any(), Matchers.any(), (Map)Matchers.any());
        PulsarMockBookKeeper bkClient = (PulsarMockBookKeeper)((Object)Mockito.mock(PulsarMockBookKeeper.class));
        ClientConfiguration conf = new ClientConfiguration();
        ((PulsarMockBookKeeper)((Object)Mockito.doReturn((Object)conf).when((Object)bkClient))).getConf();
        class MockLedgerHandle
        extends PulsarMockLedgerHandle {
            public MockLedgerHandle(PulsarMockBookKeeper bk, long id, BookKeeper.DigestType digest, byte[] passwd) throws GeneralSecurityException {
                super(bk, id, digest, passwd);
            }

            @Override
            public void asyncAddEntry(byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
            }

            @Override
            public void asyncClose(AsyncCallback.CloseCallback cb, Object ctx) {
                cb.closeComplete(0, (LedgerHandle)this, ctx);
            }
        }
        MockLedgerHandle ledgerHandle = (MockLedgerHandle)((Object)Mockito.mock(MockLedgerHandle.class));
        String data = "data";
        ((MockLedgerHandle)((Object)Mockito.doNothing().when((Object)ledgerHandle))).asyncAddEntry("data".getBytes(), null, null);
        final AtomicBoolean addSuccess = new AtomicBoolean();
        this.setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", (Object)ledgerHandle);
        boolean totalAddEntries = true;
        final CountDownLatch latch = new CountDownLatch(1);
        ledger.asyncAddEntry("data".getBytes(), new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, Object ctx) {
                addSuccess.set(true);
                latch.countDown();
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                latch.countDown();
            }
        }, null);
        latch.await();
        Assert.assertTrue((boolean)addSuccess.get());
        this.setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null);
    }

    private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
        Field field = clazz.getDeclaredField(fieldName);
        field.setAccessible(true);
        field.set(classObj, fieldValue);
    }
}

