/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.Charsets;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.lang.reflect.Field;
import java.nio.ReadOnlyBufferException;
import java.nio.charset.Charset;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.EnsemblePlacementPolicy;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.PulsarMockBookKeeper;
import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.EntryCache;
import org.apache.bookkeeper.mledger.impl.EntryCacheImpl;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.MetaStoreImpl;
import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader;
import org.apache.bookkeeper.mledger.impl.OpAddEntry;
import org.apache.bookkeeper.mledger.impl.OpReadEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.metadata.api.MetadataStore;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class ManagedLedgerTest
extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerTest.class);
    private static final Charset Encoding = Charsets.UTF_8;

    @DataProvider(name="checkOwnershipFlag")
    public Object[][] checkOwnershipFlagProvider() {
        return new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}};
    }

    @Test
    public void managedLedgerApi() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        for (int i = 0; i < 100; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        while (cursor.hasMoreEntries()) {
            List entries = cursor.readEntries(20);
            log.debug("Read {} entries", (Object)entries.size());
            Entry lastEntry = (Entry)entries.get(entries.size() - 1);
            cursor.markDelete(lastEntry.getPosition());
            for (Entry entry : entries) {
                log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
                entry.release();
            }
            log.info("-----------------------");
        }
        log.info("Finished reading entries");
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void simple() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)"dummy-entry-1".getBytes(Encoding).length);
        ManagedCursor cursor = ledger.openCursor("c1");
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(false), (long)0L);
        Assert.assertEquals((Collection)cursor.readEntries(100), new ArrayList());
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(false), (long)1L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)1L);
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)0);
        ledger.close();
        this.factory.shutdown();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void closeAndReopen() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        ledger.close();
        log.info("Closing ledger and reopening");
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ledger = factory2.open("my_test_ledger");
            cursor = ledger.openCursor("c1");
            Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
            Assert.assertEquals((long)ledger.getTotalSize(), (long)("dummy-entry-1".getBytes(Encoding).length * 2));
            List entries = cursor.readEntries(100);
            Assert.assertEquals((int)entries.size(), (int)1);
            entries.forEach(e -> e.release());
            ledger.close();
        }
        finally {
            if (Collections.singletonList(factory2).get(0) != null) {
                factory2.shutdown();
            }
        }
    }

    @Test(timeOut=20000L)
    public void acknowledge1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        List entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(false), (long)2L);
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)2L);
        cursor.markDelete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(false), (long)1L);
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        Assert.assertEquals((long)ledger.getNumberOfActiveEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        cursor = ledger.openCursor("c1");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)("dummy-entry-1".getBytes(Encoding).length * 2));
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)cursor.getNumberOfEntriesInBacklog(false), (long)1L);
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void asyncAPI() throws Throwable {
        final CountDownLatch counter = new CountDownLatch(1);
        this.factory.asyncOpen("my_test_ledger", new ManagedLedgerConfig(), new AsyncCallbacks.OpenLedgerCallback(){

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                ledger.asyncOpenCursor("test-cursor", new AsyncCallbacks.OpenCursorCallback(){

                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                        ManagedLedger ledger = (ManagedLedger)ctx;
                        ledger.asyncAddEntry("test".getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                                Pair pair = (Pair)ctx;
                                ManagedLedger ledger = (ManagedLedger)pair.getLeft();
                                ManagedCursor cursor = (ManagedCursor)pair.getRight();
                                Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
                                Assert.assertEquals((long)ledger.getTotalSize(), (long)"test".getBytes(Encoding).length);
                                cursor.asyncReadEntries(2, new AsyncCallbacks.ReadEntriesCallback(){

                                    public void readEntriesComplete(List<Entry> entries, Object ctx) {
                                        ManagedCursor cursor = (ManagedCursor)ctx;
                                        Assert.assertEquals((int)entries.size(), (int)1);
                                        Entry entry = entries.get(0);
                                        Position position = entry.getPosition();
                                        Assert.assertEquals((String)new String(entry.getDataAndRelease(), Encoding), (String)"test");
                                        log.debug("Mark-Deleting to position {}", (Object)position);
                                        cursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback(){

                                            public void markDeleteComplete(Object ctx) {
                                                log.debug("Mark delete complete");
                                                ManagedCursor cursor = (ManagedCursor)ctx;
                                                Assert.assertFalse((boolean)cursor.hasMoreEntries());
                                                counter.countDown();
                                            }

                                            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                                                Assert.fail((String)exception.getMessage());
                                            }
                                        }, (Object)cursor);
                                    }

                                    public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                                        Assert.fail((String)exception.getMessage());
                                    }
                                }, (Object)cursor, PositionImpl.latest);
                            }

                            public void addFailed(ManagedLedgerException exception, Object ctx) {
                                Assert.fail((String)exception.getMessage());
                            }
                        }, (Object)Pair.of((Object)ledger, (Object)cursor));
                    }

                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                        Assert.fail((String)exception.getMessage());
                    }
                }, (Object)ledger);
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null, null);
        counter.await();
        log.info("Test completed");
    }

    @Test(timeOut=20000L)
    public void spanningMultipleLedgers() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("c1");
        for (int i = 0; i < 11; ++i) {
            ledger.addEntry(("dummy-entry-" + i).getBytes(Encoding));
        }
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)11);
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        PositionImpl first = (PositionImpl)((Entry)entries.get(0)).getPosition();
        PositionImpl last = (PositionImpl)((Entry)entries.get(entries.size() - 1)).getPosition();
        entries.forEach(e -> e.release());
        log.info("First={} Last={}", (Object)first, (Object)last);
        Assert.assertTrue((first.getLedgerId() < last.getLedgerId() ? 1 : 0) != 0);
        Assert.assertEquals((long)first.getEntryId(), (long)0L);
        Assert.assertEquals((long)last.getEntryId(), (long)0L);
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)0);
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void spanningMultipleLedgersWithSize() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1000000);
        config.setMaxSizePerLedgerMb(1);
        config.setEnsembleSize(1);
        config.setWriteQuorumSize(1).setAckQuorumSize(1);
        config.setMetadataWriteQuorumSize(1).setMetadataAckQuorumSize(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("c1");
        byte[] content = new byte[1047552];
        for (int i = 0; i < 3; ++i) {
            ledger.addEntry(content);
        }
        List entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)3);
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        PositionImpl first = (PositionImpl)((Entry)entries.get(0)).getPosition();
        PositionImpl last = (PositionImpl)((Entry)entries.get(entries.size() - 1)).getPosition();
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(100);
        Assert.assertEquals((int)entries.size(), (int)0);
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        entries.forEach(e -> e.release());
        log.info("First={} Last={}", (Object)first, (Object)last);
        Assert.assertTrue((first.getLedgerId() < last.getLedgerId() ? 1 : 0) != 0);
        Assert.assertEquals((long)first.getEntryId(), (long)0L);
        Assert.assertEquals((long)last.getEntryId(), (long)0L);
        ledger.close();
    }

    @Test(expectedExceptions={IllegalArgumentException.class})
    public void invalidReadEntriesArg1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("entry".getBytes());
        cursor.readEntries(-1);
        Assert.fail((String)"Should have thrown an exception in the above line");
    }

    @Test(expectedExceptions={IllegalArgumentException.class})
    public void invalidReadEntriesArg2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("entry".getBytes());
        cursor.readEntries(0);
        Assert.fail((String)"Should have thrown an exception in the above line");
    }

    @Test(timeOut=20000L)
    public void deleteAndReopen() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.delete();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void deleteAndReopenWithCursors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.delete();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("test-cursor");
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void asyncDeleteWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.close();
        ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        this.stopMetadataStore();
        this.factory.open("my_test_ledger", new ManagedLedgerConfig()).asyncDelete(new AsyncCallbacks.DeleteLedgerCallback(){

            public void deleteLedgerComplete(Object ctx) {
                Assert.assertNull((Object)ctx);
                Assert.fail((String)"The async-call should have failed");
            }

            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        counter.await();
    }

    private byte[] copyBytesFromByteBuf(ByteBuf buf) {
        int index = buf.readerIndex();
        byte[] bytes = new byte[buf.readableBytes()];
        buf.getBytes(index, bytes);
        buf.readerIndex(index);
        return bytes;
    }

    @Test(timeOut=20000L)
    public void asyncAddEntryWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ledger.openCursor("test-cursor");
        int count = 4;
        final CountDownLatch counter = new CountDownLatch(4);
        final byte[] bytes = "dummy-entry-1".getBytes(Encoding);
        AsyncCallbacks.AddEntryCallback callback = new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                Assert.assertNull((Object)ctx);
                Assert.assertEquals((byte[])ManagedLedgerTest.this.copyBytesFromByteBuf(entryData), (byte[])bytes);
                try {
                    entryData.array();
                }
                catch (Exception e) {
                    Assert.assertTrue((boolean)(e instanceof ReadOnlyBufferException));
                }
                counter.countDown();
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        };
        for (int i = 0; i < 4; ++i) {
            ledger.asyncAddEntry(bytes, callback, null);
        }
        counter.await();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)4L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)("dummy-entry-1".getBytes(Encoding).length * 4));
    }

    @Test(timeOut=20000L)
    public void doubleAsyncAddEntryWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        final CountDownLatch done = new CountDownLatch(10);
        for (int i = 0; i < 10; ++i) {
            final String content = "dummy-entry-" + i;
            ledger.asyncAddEntry(content.getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                    Assert.assertNotNull((Object)ctx);
                    Assert.assertEquals((byte[])ManagedLedgerTest.this.copyBytesFromByteBuf(entryData), (byte[])content.getBytes(Encoding));
                    log.info("Successfully added {}", (Object)content);
                    done.countDown();
                }

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    Assert.fail((String)exception.getMessage());
                }
            }, (Object)this);
        }
        done.await();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)10L);
    }

    @Test(timeOut=20000L)
    public void asyncAddEntryWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        this.stopMetadataStore();
        ledger.asyncAddEntry("dummy-entry-1".getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                Assert.fail((String)"Should have failed");
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void asyncCloseWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test-cursor");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        final CountDownLatch counter = new CountDownLatch(1);
        ledger.asyncClose(new AsyncCallbacks.CloseCallback(){

            public void closeComplete(Object ctx) {
                Assert.assertNull((Object)ctx);
                counter.countDown();
            }

            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void asyncOpenCursorWithoutError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        final CountDownLatch counter = new CountDownLatch(1);
        ledger.asyncOpenCursor("test-cursor", new AsyncCallbacks.OpenCursorCallback(){

            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                Assert.assertNull((Object)ctx);
                Assert.assertNotNull((Object)cursor);
                counter.countDown();
            }

            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)exception.getMessage());
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void asyncOpenCursorWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        final CountDownLatch counter = new CountDownLatch(1);
        this.stopBookKeeper();
        this.stopMetadataStore();
        ledger.asyncOpenCursor("test-cursor", new AsyncCallbacks.OpenCursorCallback(){

            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                Assert.fail((String)"The async-call should have failed");
            }

            public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                counter.countDown();
            }
        }, null);
        counter.await();
    }

    @Test(timeOut=20000L)
    public void readFromOlderLedger() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
    }

    @Test(timeOut=20000L)
    public void readFromOlderLedgers() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        cursor.readEntries(1).forEach(e -> e.release());
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        cursor.readEntries(1).forEach(e -> e.release());
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        cursor.readEntries(1).forEach(e -> e.release());
        Assert.assertFalse((boolean)cursor.hasMoreEntries());
    }

    @Test(timeOut=20000L)
    public void triggerLedgerDeletion() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        List entries = cursor.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)3L);
        entries.forEach(e -> e.release());
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        entries = cursor.readEntries(1);
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        cursor.markDelete(((Entry)entries.get(0)).getPosition());
        entries.forEach(e -> e.release());
    }

    @Test(timeOut=20000L)
    public void testEmptyManagedLedgerContent() throws Exception {
        this.metadataStore.put("/managed-ledger/my_test_ledger", " ".getBytes(), Optional.empty()).join();
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
    }

    @Test(timeOut=20000L)
    public void testProducerAndNoConsumer() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ledger.addEntry("entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.addEntry("entry-2".getBytes(Encoding));
        while (ledger.getNumberOfEntries() > 1L) {
            log.debug("entries={}", (Object)ledger.getNumberOfEntries());
            Thread.sleep(100L);
        }
        ledger.addEntry("entry-3".getBytes(Encoding));
        while (ledger.getNumberOfEntries() > 1L) {
            log.debug("entries={}", (Object)ledger.getNumberOfEntries());
            Thread.sleep(100L);
        }
    }

    @Test(timeOut=20000L)
    public void testTrimmer() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("c1");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        ledger.addEntry("entry-1".getBytes(Encoding));
        ledger.addEntry("entry-2".getBytes(Encoding));
        ledger.addEntry("entry-3".getBytes(Encoding));
        ledger.addEntry("entry-4".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)4L);
        cursor.readEntries(1).forEach(e -> e.release());
        cursor.readEntries(1).forEach(e -> e.release());
        List entries = cursor.readEntries(1);
        Position lastPosition = ((Entry)entries.get(0)).getPosition();
        entries.forEach(e -> e.release());
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)4L);
        cursor.markDelete(lastPosition);
        while (ledger.getNumberOfEntries() != 2L) {
            Thread.sleep(10L);
        }
    }

    @Test(timeOut=20000L)
    public void testAsyncAddEntryAndSyncClose() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(10);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ledger.openCursor("c1");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        final CountDownLatch counter = new CountDownLatch(100);
        for (int i = 0; i < 100; ++i) {
            String content = "entry-" + i;
            ledger.asyncAddEntry(content.getBytes(Encoding), new AsyncCallbacks.AddEntryCallback(){

                public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                    counter.countDown();
                }

                public void addFailed(ManagedLedgerException exception, Object ctx) {
                    Assert.fail((String)exception.getMessage());
                }
            }, null);
        }
        counter.await();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)100L);
    }

    @Test(timeOut=20000L)
    public void moveCursorToNextLedger() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(1);
        ManagedLedger ledger = this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test");
        ledger.addEntry("entry-1".getBytes(Encoding));
        log.debug("Added 1st message");
        List entries = cursor.readEntries(1);
        log.debug("read message ok");
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        ledger.addEntry("entry-2".getBytes(Encoding));
        log.debug("Added 2nd message");
        ledger.addEntry("entry-3".getBytes(Encoding));
        log.debug("Added 3nd message");
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)2L);
        entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)2);
        entries.forEach(e -> e.release());
        entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries = cursor.readEntries(2);
        Assert.assertEquals((int)entries.size(), (int)0);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void differentSessions() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)"dummy-entry-1".getBytes(Encoding).length);
        Assert.assertTrue((boolean)cursor.hasMoreEntries());
        Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
        ledger.close();
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ledger = factory2.open("my_test_ledger");
            Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
            Assert.assertEquals((long)ledger.getTotalSize(), (long)"dummy-entry-1".getBytes(Encoding).length);
            cursor = ledger.openCursor("c1");
            Assert.assertTrue((boolean)cursor.hasMoreEntries());
            Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)1L);
            ledger.addEntry("dummy-entry-2".getBytes(Encoding));
            Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)2L);
            Assert.assertEquals((long)ledger.getTotalSize(), (long)("dummy-entry-1".getBytes(Encoding).length * 2));
            Assert.assertTrue((boolean)cursor.hasMoreEntries());
            Assert.assertEquals((long)cursor.getNumberOfEntries(), (long)2L);
            ledger.close();
        }
        finally {
            if (Collections.singletonList(factory2).get(0) != null) {
                factory2.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=false)
    public void fenceManagedLedger() throws Exception {
        ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedger ledger1 = factory1.open("my_test_ledger");
            ManagedCursor cursor1 = ledger1.openCursor("c1");
            ledger1.addEntry("entry-1".getBytes(Encoding));
            ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
            try {
                ManagedLedger ledger2 = factory2.open("my_test_ledger");
                ManagedCursor cursor2 = ledger2.openCursor("c1");
                try {
                    ledger1.addEntry("entry-1".getBytes(Encoding));
                    Assert.fail((String)"Expecting exception");
                }
                catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
                    // empty catch block
                }
                try {
                    ledger1.addEntry("entry-2".getBytes(Encoding));
                    Assert.fail((String)"Expecting exception");
                }
                catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
                    // empty catch block
                }
                try {
                    cursor1.readEntries(10);
                    Assert.fail((String)"Expecting exception");
                }
                catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
                    // empty catch block
                }
                try {
                    ledger1.openCursor("new cursor");
                    Assert.fail((String)"Expecting exception");
                }
                catch (ManagedLedgerException.ManagedLedgerFencedException managedLedgerFencedException) {
                    // empty catch block
                }
                ledger2.addEntry("entry-2".getBytes(Encoding));
                Assert.assertEquals((long)cursor2.getNumberOfEntries(), (long)2L);
            }
            finally {
                if (Collections.singletonList(factory2).get(0) != null) {
                    factory2.shutdown();
                }
            }
        }
        finally {
            if (Collections.singletonList(factory1).get(0) != null) {
                factory1.shutdown();
            }
        }
    }

    @Test
    public void forceCloseLedgers() throws Exception {
        ManagedLedger ledger1 = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ledger1.openCursor("c1");
        ManagedCursor c2 = ledger1.openCursor("c2");
        ledger1.addEntry("entry-1".getBytes(Encoding));
        ledger1.addEntry("entry-2".getBytes(Encoding));
        ledger1.addEntry("entry-3".getBytes(Encoding));
        c2.readEntries(1).forEach(e -> e.release());
        c2.readEntries(1).forEach(e -> e.release());
        c2.readEntries(1).forEach(e -> e.release());
        ledger1.close();
        try {
            ledger1.addEntry("entry-3".getBytes(Encoding));
            Assert.fail((String)"should not have reached this point");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        try {
            ledger1.openCursor("new-cursor");
            Assert.fail((String)"should not have reached this point");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test
    public void closeLedgerWithError() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("entry-1".getBytes(Encoding));
        this.stopMetadataStore();
        this.stopBookKeeper();
        try {
            ledger.close();
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    public void deleteWithErrors1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        PositionImpl position = (PositionImpl)ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        this.bkc.deleteLedger(position.getLedgerId());
        ledger.delete();
    }

    @Test(timeOut=20000L)
    public void deleteWithErrors2() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        this.stopMetadataStore();
        try {
            ledger.delete();
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
        }
        catch (RejectedExecutionException rejectedExecutionException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L)
    public void readWithErrors1() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ManagedCursor cursor = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
        this.stopMetadataStore();
        this.stopBookKeeper();
        try {
            cursor.readEntries(10);
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        try {
            ledger.addEntry("dummy-entry-3".getBytes(Encoding));
            Assert.fail((String)"should have failed");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
    }

    @Test(timeOut=20000L, enabled=false)
    void concurrentAsyncOpen() throws Exception {
        final CountDownLatch counter = new CountDownLatch(2);
        class Result {
            ManagedLedger instance1 = null;
            ManagedLedger instance2 = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.factory.asyncOpen("my-test-ledger", new AsyncCallbacks.OpenLedgerCallback(){
            {
            }

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                result.instance1 = ledger;
                counter.countDown();
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        this.factory.asyncOpen("my-test-ledger", new AsyncCallbacks.OpenLedgerCallback(){
            {
            }

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                result.instance2 = ledger;
                counter.countDown();
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        counter.await();
        Assert.assertEquals((Object)result.instance1, (Object)result.instance2);
        Assert.assertNotNull((Object)result.instance1);
    }

    @Test
    public void asyncOpenClosedLedger() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my-closed-ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        c1.close();
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.setFenced();
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedger instance1 = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.factory.asyncOpen("my-closed-ledger", new AsyncCallbacks.OpenLedgerCallback(){
            {
            }

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                result.instance1 = ledger;
                counter.countDown();
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        counter.await();
        Assert.assertNotNull((Object)result.instance1);
        ManagedCursor c2 = result.instance1.openCursor("c1");
        List entries = c2.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
    }

    @Test
    public void getCursors() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ManagedCursor c2 = ledger.openCursor("c2");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)ledger.getCursors()), (Set)Sets.newHashSet((Object[])new ManagedCursor[]{c1, c2}));
        c1.close();
        ledger.deleteCursor("c1");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)ledger.getCursors()), (Set)Sets.newHashSet((Object[])new ManagedCursor[]{c2}));
        c2.close();
        ledger.deleteCursor("c2");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)ledger.getCursors()), (Set)Sets.newHashSet());
    }

    @Test
    public void testUpdateProperties() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        HashMap<String, String> properties = new HashMap<String, String>();
        properties.put("key1", "value1");
        properties.put("key2", "value2");
        properties.put("key3", "value3");
        ledger.setProperties(properties);
        Assert.assertEquals((Map)ledger.getProperties(), properties);
        properties.put("key4", "value4");
        ledger.setProperty("key4", "value4");
        Assert.assertEquals((Map)ledger.getProperties(), properties);
        ledger.deleteProperty("key4");
        properties.remove("key4");
        Assert.assertEquals((Map)ledger.getProperties(), properties);
        HashMap<String, String> newProperties = new HashMap<String, String>();
        newProperties.put("key5", "value5");
        newProperties.put("key1", "value6");
        newProperties.putAll(properties);
        ledger.setProperties(newProperties);
        Assert.assertEquals((Map)ledger.getProperties(), newProperties);
    }

    @Test
    public void testAsyncUpdateProperties() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        final HashMap<String, String> prop = new HashMap<String, String>();
        prop.put("key1", "value1");
        prop.put("key2", "value2");
        prop.put("key3", "value3");
        final CountDownLatch latch1 = new CountDownLatch(1);
        ledger.asyncSetProperties(prop, new AsyncCallbacks.UpdatePropertiesCallback(){

            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
                Assert.assertEquals((Map)prop, properties);
                latch1.countDown();
            }

            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        Assert.assertTrue((boolean)latch1.await(5L, TimeUnit.SECONDS));
        final CountDownLatch latch2 = new CountDownLatch(1);
        ledger.asyncSetProperty("key4", "value4", new AsyncCallbacks.UpdatePropertiesCallback(){

            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
                Assert.assertNotNull((Object)properties.get("key4"));
                Assert.assertEquals((String)"value4", (String)properties.get("key4"));
                latch2.countDown();
            }

            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        Assert.assertTrue((boolean)latch2.await(5L, TimeUnit.SECONDS));
        prop.remove("key1");
        final CountDownLatch latch3 = new CountDownLatch(1);
        ledger.asyncDeleteProperty("key1", new AsyncCallbacks.UpdatePropertiesCallback(){

            public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
                Assert.assertNull((Object)properties.get("key1"));
                latch3.countDown();
            }

            public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, null);
        Assert.assertTrue((boolean)latch3.await(5L, TimeUnit.SECONDS));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentAsyncSetProperties() throws Exception {
        final CountDownLatch latch = new CountDownLatch(1000);
        ManagedLedger ledger = this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(1));
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            int i = 0;
            while (i < 1000) {
                int finalI = i++;
                executor.execute(() -> {
                    final HashMap<String, String> newProperties = new HashMap<String, String>();
                    newProperties.put("key0", String.valueOf(finalI));
                    newProperties.put("key1", "value1");
                    newProperties.put("key2", "value2");
                    newProperties.put("key3", "value3");
                    ledger.asyncSetProperties(newProperties, new AsyncCallbacks.UpdatePropertiesCallback(){

                        public void updatePropertiesComplete(Map<String, String> properties, Object ctx) {
                            Assert.assertEquals(properties, (Map)newProperties);
                            latch.countDown();
                        }

                        public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx) {
                        }
                    }, null);
                });
            }
            try {
                for (i = 0; i < 100; ++i) {
                    ledger.addEntry("data".getBytes(Encoding));
                    Thread.sleep(300L);
                }
            }
            catch (Exception e) {
                Assert.fail((String)e.getMessage());
            }
            Assert.assertTrue((boolean)latch.await(300L, TimeUnit.SECONDS));
            this.factory.shutdown();
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void ledgersList() throws Exception {
        MetaStore store = this.factory.getMetaStore();
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet());
        ManagedLedger ledger1 = this.factory.open("ledger1");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet((Object[])new String[]{"ledger1"}));
        ManagedLedger ledger2 = this.factory.open("ledger2");
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet((Object[])new String[]{"ledger1", "ledger2"}));
        ledger1.delete();
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet((Object[])new String[]{"ledger2"}));
        ledger2.delete();
        Assert.assertEquals((Set)Sets.newHashSet((Iterable)store.getManagedLedgers()), (Set)Sets.newHashSet());
    }

    @Test
    public void testCleanup() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes(Encoding));
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)2);
        ledger.delete();
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)0);
    }

    @Test(timeOut=20000L)
    public void testAsyncCleanup() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes(Encoding));
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)2);
        final CountDownLatch latch = new CountDownLatch(1);
        ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback(){

            public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
                Assert.fail((String)"should have succeeded");
            }

            public void deleteLedgerComplete(Object ctx) {
                latch.countDown();
            }
        }, null);
        latch.await();
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)0);
    }

    @Test(timeOut=20000L)
    public void testReopenAndCleanup() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes(Encoding));
        ledger.close();
        Thread.sleep(100L);
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)1);
        this.factory.shutdown();
        this.factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        ledger = this.factory.open("my_test_ledger");
        ledger.openCursor("c1");
        Thread.sleep(100L);
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)2);
        ledger.close();
        this.factory.open("my_test_ledger", new ManagedLedgerConfig()).delete();
        Thread.sleep(100L);
        Assert.assertEquals((int)this.bkc.getLedgers().size(), (int)0);
        this.factory.shutdown();
    }

    @Test(timeOut=20000L)
    public void doubleOpen() throws Exception {
        ManagedLedger ledger1 = this.factory.open("my_test_ledger");
        ManagedLedger ledger2 = this.factory.open("my_test_ledger");
        Assert.assertSame((Object)ledger1, (Object)ledger2);
    }

    @Test
    public void compositeNames() throws Exception {
        this.factory.open("my/test/ledger");
    }

    @Test
    public void previousPosition() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ManagedCursor cursor = ledger.openCursor("my_cursor");
        Position p0 = cursor.getMarkDeletedPosition();
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p0), (Object)p0);
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        PositionImpl pBeforeWriting = ledger.getLastPosition();
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry".getBytes());
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", new ManagedLedgerConfig().setMaxEntriesPerLedger(2));
        Position p2 = ledger.addEntry("entry".getBytes());
        Position p3 = ledger.addEntry("entry".getBytes());
        Position p4 = ledger.addEntry("entry".getBytes());
        Assert.assertEquals((Object)ledger.getPreviousPosition(p1), (Object)pBeforeWriting);
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p2), (Object)p1);
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p3), (Object)p2);
        Assert.assertEquals((Object)ledger.getPreviousPosition((PositionImpl)p4), (Object)p3);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeOut=20000L)
    public void testOpenRaceCondition() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setEnsembleSize(2).setAckQuorumSize(2).setMetadataEnsembleSize(2);
        ManagedLedger ledger = this.factory.open("my-ledger", config);
        ManagedCursor c1 = ledger.openCursor("c1");
        int N = 1000;
        Position position = ledger.addEntry("entry-0".getBytes());
        ExecutorService executor = Executors.newCachedThreadPool();
        try {
            CountDownLatch counter = new CountDownLatch(2);
            executor.execute(() -> {
                try {
                    for (int i = 0; i < 1000; ++i) {
                        c1.markDelete(position);
                    }
                    counter.countDown();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
            executor.execute(() -> {
                try {
                    for (int i = 0; i < 1000; ++i) {
                        ledger.openCursor("cursor-" + i);
                    }
                    counter.countDown();
                }
                catch (Exception e) {
                    e.printStackTrace();
                }
            });
            counter.await();
        }
        finally {
            if (Collections.singletonList(executor).get(0) != null) {
                executor.shutdownNow();
            }
        }
    }

    @Test
    public void invalidateConsumedEntriesFromCache() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        EntryCacheManager cacheManager = this.factory.getEntryCacheManager();
        EntryCache entryCache = ledger.entryCache;
        ManagedCursorImpl c1 = (ManagedCursorImpl)ledger.openCursor("c1");
        ManagedCursorImpl c2 = (ManagedCursorImpl)ledger.openCursor("c2");
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry-1".getBytes());
        PositionImpl p2 = (PositionImpl)ledger.addEntry("entry-2".getBytes());
        PositionImpl p3 = (PositionImpl)ledger.addEntry("entry-3".getBytes());
        PositionImpl p4 = (PositionImpl)ledger.addEntry("entry-4".getBytes());
        Assert.assertEquals((long)entryCache.getSize(), (long)28L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c2.setReadPosition((Position)p3);
        ledger.discardEntriesFromCache(c2, p2);
        Assert.assertEquals((long)entryCache.getSize(), (long)28L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c1.setReadPosition((Position)p2);
        ledger.discardEntriesFromCache(c1, p2);
        Assert.assertEquals((long)entryCache.getSize(), (long)21L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c1.setReadPosition((Position)p3);
        ledger.discardEntriesFromCache(c1, p3);
        Assert.assertEquals((long)entryCache.getSize(), (long)21L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        ledger.deactivateCursor((ManagedCursor)c1);
        Assert.assertEquals((long)entryCache.getSize(), (long)21L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        c2.setReadPosition((Position)p4);
        ledger.discardEntriesFromCache(c2, p4);
        Assert.assertEquals((long)entryCache.getSize(), (long)7L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
        ledger.deactivateCursor((ManagedCursor)c2);
        Assert.assertEquals((long)entryCache.getSize(), (long)0L);
        Assert.assertEquals((long)cacheManager.getSize(), (long)entryCache.getSize());
    }

    @Test
    public void discardEmptyLedgersOnClose() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("entry".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
    }

    @Test
    public void discardEmptyLedgersOnError() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        this.bkc.failNow(-3);
        this.metadataStore.failConditional(new MetadataStoreException("error"), (op, path) -> path.equals("/managed-ledgers/my_test_ledger") && op == FaultInjectionMetadataStore.OperationType.PUT);
        try {
            ledger.addEntry("entry".getBytes());
            Assert.fail((String)"Should have received exception");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)0);
        this.bkc.failNow(-3);
        try {
            ledger.addEntry("entry".getBytes());
            Assert.fail((String)"Should have received exception");
        }
        catch (ManagedLedgerException managedLedgerException) {
            // empty catch block
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)0);
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)0L);
    }

    @Test
    public void cursorReadsWithDiscardedEmptyLedgers() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        Position p1 = c1.getReadPosition();
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)0L);
        Assert.assertFalse((boolean)c1.hasMoreEntries());
        ledger.addEntry("entry".getBytes());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertTrue((boolean)c1.hasMoreEntries());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        List entries = c1.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        Assert.assertFalse((boolean)c1.hasMoreEntries());
        Assert.assertEquals((int)c1.readEntries(1).size(), (int)0);
        c1.seek(p1);
        Assert.assertTrue((boolean)c1.hasMoreEntries());
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        entries = c1.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        Assert.assertEquals((int)c1.readEntries(1).size(), (int)0);
    }

    @Test
    public void cursorReadsWithDiscardedEmptyLedgersStillListed() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-1".getBytes());
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        ledger.addEntry("entry-2".getBytes());
        final MLDataFormats.ManagedLedgerInfo.LedgerInfo l1info = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0);
        final MLDataFormats.ManagedLedgerInfo.LedgerInfo l2info = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1);
        ledger.close();
        final CountDownLatch counter = new CountDownLatch(1);
        final MetaStore store = this.factory.getMetaStore();
        store.getManagedLedgerInfo("my_test_ledger", false, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>(){

            public void operationComplete(MLDataFormats.ManagedLedgerInfo result, Stat version) {
                MLDataFormats.ManagedLedgerInfo.Builder info = MLDataFormats.ManagedLedgerInfo.newBuilder((MLDataFormats.ManagedLedgerInfo)result);
                info.clearLedgerInfo();
                info.addLedgerInfo(MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(l1info.getLedgerId()).build());
                info.addLedgerInfo(l2info);
                store.asyncUpdateLedgerIds("my_test_ledger", info.build(), version, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<Void>(){

                    public void operationComplete(Void result, Stat version) {
                        counter.countDown();
                    }

                    public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                        counter.countDown();
                    }
                });
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                counter.countDown();
            }
        });
        counter.await();
        this.bkc.deleteLedger(l1info.getLedgerId());
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((long)c1.getNumberOfEntries(), (long)1L);
        Assert.assertTrue((boolean)c1.hasMoreEntries());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        List entries = c1.readEntries(10);
        Assert.assertEquals((int)entries.size(), (int)1);
        entries.forEach(e -> e.release());
        Assert.assertFalse((boolean)c1.hasMoreEntries());
        entries = c1.readEntries(1);
        Assert.assertEquals((int)entries.size(), (int)0);
        entries.forEach(e -> e.release());
    }

    @Test
    public void addEntryWithOffset() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry("012345678".getBytes(), 2, 3);
        List entries = c1.readEntries(1);
        Assert.assertEquals((int)((Entry)entries.get(0)).getLength(), (int)3);
        Entry entry = (Entry)entries.get(0);
        Assert.assertEquals((String)new String(entry.getData()), (String)"234");
        entry.release();
    }

    @Test
    public void totalSizeTest() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", conf);
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry(new byte[10], 1, 8);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)8L);
        PositionImpl p2 = (PositionImpl)ledger.addEntry(new byte[12], 2, 5);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)13L);
        c1.markDelete((Position)new PositionImpl(p2.getLedgerId(), -1L));
        Thread.sleep(400L);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)5L);
    }

    @Test
    public void testMinimumRolloverTime() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        conf.setMinimumRolloverTime(1, TimeUnit.SECONDS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", conf);
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Thread.sleep(1000L);
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
    }

    @Test
    public void testMaximumRolloverTime() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(5);
        conf.setMinimumRolloverTime(1, TimeUnit.SECONDS);
        conf.setMaximumRolloverTime(1, TimeUnit.SECONDS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_maxtime_ledger", conf);
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Thread.sleep(2000L);
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
    }

    @Test
    public void testNoRolloverIfNoMetadataSession() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        conf.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testNoRolloverIfNoMetadataSession", conf);
        ledger.openCursor("c1");
        this.metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
        for (int i = 1; i < 10; ++i) {
            ledger.addEntry("data".getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        this.metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
    }

    @Test
    public void testNoRolloverIfNoMetadataSessionWithExistingData() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(2);
        conf.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testNoRolloverIfNoMetadataSession", conf);
        ledger.openCursor("c1");
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        this.metadataStore.triggerSessionEvent(SessionEvent.SessionLost);
        for (int i = 1; i < 10; ++i) {
            ledger.addEntry("data".getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        this.metadataStore.triggerSessionEvent(SessionEvent.SessionReestablished);
        ledger.addEntry("data".getBytes());
        ledger.addEntry("data".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionSizeInMB(10L);
            config.setMaxEntriesPerLedger(1);
            config.setRetentionTime(1, TimeUnit.HOURS);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
            ManagedCursor c1 = ml.openCursor("c1");
            ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.close();
            ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
            c1 = ml.openCursor("c1");
            ml.addEntry("shortmessage".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.close();
            Assert.assertTrue((ml.getLedgersInfoAsList().size() > 1 ? 1 : 0) != 0);
            Assert.assertTrue((ml.getTotalSize() > (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(enabled=true)
    public void testNoRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionSizeInMB(0L);
            config.setMaxEntriesPerLedger(1);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("noretention_test_ledger", config);
            ManagedCursor c1 = ml.openCursor("c1noretention");
            ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.close();
            ml = (ManagedLedgerImpl)factory.open("noretention_test_ledger", config);
            c1 = ml.openCursor("c1noretention");
            ml.addEntry("shortmessage".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            Thread.sleep(1000L);
            ml.close();
            Assert.assertTrue((ml.getLedgersInfoAsList().size() <= 1 ? 1 : 0) != 0);
            Assert.assertTrue((ml.getTotalSize() <= (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeletionAfterRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionSizeInMB(0L);
            config.setMaxEntriesPerLedger(1);
            config.setRetentionTime(1, TimeUnit.SECONDS);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
            ManagedCursor c1 = ml.openCursor("c1noretention");
            ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.close();
            ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
            c1 = ml.openCursor("c1noretention");
            ml.addEntry("shortmessage".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            Thread.sleep(1000L);
            ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
            Assert.assertTrue((ml.getLedgersInfoAsList().size() <= 1 ? 1 : 0) != 0);
            Assert.assertTrue((ml.getTotalSize() <= (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
            ml.close();
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeletionAfterLedgerClosedAndRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionSizeInMB(0L);
            config.setMaxEntriesPerLedger(1);
            config.setRetentionTime(1, TimeUnit.SECONDS);
            config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
            ManagedCursor c1 = ml.openCursor("testCursor1");
            ManagedCursor c2 = ml.openCursor("testCursor2");
            ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            c2.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.rollCurrentLedgerIfFull();
            Thread.sleep(1500L);
            ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
            Assert.assertTrue((ml.getLedgersInfoAsList().size() <= 1 ? 1 : 0) != 0);
            Assert.assertEquals((long)ml.getTotalSize(), (long)0L);
            ml.close();
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetention0WithEmptyLedger() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionTime(0, TimeUnit.MINUTES);
            config.setMaxEntriesPerLedger(1);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
            ManagedCursor c1 = ml.openCursor("c1noretention");
            ml.addEntry("message1".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.close();
            ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
            c1 = ml.openCursor("c1noretention");
            ml.deleteCursor(c1.getName());
            ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
            Assert.assertTrue((ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId ? 1 : 0) != 0);
            ml.close();
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetention0WithEmptyLedgerWithoutCursors() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionTime(0, TimeUnit.MINUTES);
            config.setMaxEntriesPerLedger(1);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
            ml.addEntry("message1".getBytes());
            ml.close();
            ml = (ManagedLedgerImpl)factory.open("deletion_after_retention_test_ledger", config);
            ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
            Assert.assertTrue((ml.getFirstPosition().ledgerId <= ml.lastConfirmedEntry.ledgerId ? 1 : 0) != 0);
            Assert.assertFalse((boolean)ml.getLedgersInfo().containsKey(ml.lastConfirmedEntry.ledgerId), (String)"the ledger at lastConfirmedEntry has not been trimmed!");
            ml.close();
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testInfiniteRetention() throws Exception {
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionSizeInMB(-1L);
            config.setRetentionTime(-1, TimeUnit.HOURS);
            config.setMaxEntriesPerLedger(1);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
            ManagedCursor c1 = ml.openCursor("c1");
            ml.addEntry("iamaverylongmessagethatshouldberetained".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.close();
            ml = (ManagedLedgerImpl)factory.open("retention_test_ledger", config);
            c1 = ml.openCursor("c1");
            ml.addEntry("shortmessage".getBytes());
            c1.skipEntries(1, ManagedCursor.IndividualDeletedEntries.Exclude);
            ml.close();
            Assert.assertTrue((ml.getLedgersInfoAsList().size() > 1 ? 1 : 0) != 0);
            Assert.assertTrue((ml.getTotalSize() > (long)"shortmessage".getBytes().length ? 1 : 0) != 0);
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRetentionSize() throws Exception {
        int retentionSizeInMB = 5;
        int totalMessage = 10;
        int messageSize = 0x100000;
        char[] data = new char[0x100000];
        Arrays.fill(data, 'a');
        byte[] message = new String(data).getBytes(Encoding);
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            ManagedLedgerConfig config = new ManagedLedgerConfig();
            config.setRetentionSizeInMB(5L);
            config.setMaxEntriesPerLedger(1);
            config.setRetentionTime(1, TimeUnit.HOURS);
            ManagedLedgerImpl ml = (ManagedLedgerImpl)factory.open("retention_size_ledger", config);
            ManagedCursor c1 = ml.openCursor("c1");
            Position position = null;
            for (int i = 0; i < 10; ++i) {
                position = ml.addEntry(message);
            }
            Assert.assertEquals((int)ml.getLedgersInfoAsList().size(), (int)10);
            List entryList = c1.readEntries(10);
            if (null != position) {
                c1.markDelete(position);
            }
            entryList.forEach(entry -> {
                log.info("Read entry position {}:{}", (Object)entry.getLedgerId(), (Object)entry.getEntryId());
                entry.release();
            });
            Awaitility.await().untilAsserted(() -> {
                Assert.assertTrue((ml.getTotalSize() <= 0x500000L ? 1 : 0) != 0);
                Assert.assertEquals((int)ml.getLedgersInfoAsList().size(), (int)5);
            });
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    @Test
    public void testTimestampOnWorkingLedger() throws Exception {
        MLDataFormats.ManagedLedgerInfo.LedgerInfo i;
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        conf.setRetentionSizeInMB(10L);
        conf.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedgerImpl ml = (ManagedLedgerImpl)this.factory.open("my_test_ledger", conf);
        ml.openCursor("c1");
        ml.addEntry("msg1".getBytes());
        Iterator iter = ml.getLedgersInfoAsList().iterator();
        long ts = -1L;
        while (iter.hasNext()) {
            i = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)iter.next();
            if (iter.hasNext()) {
                Assert.assertTrue((ts <= i.getTimestamp() ? 1 : 0) != 0, (String)i.toString());
                ts = i.getTimestamp();
                continue;
            }
            Assert.assertTrue((i.getTimestamp() == 0L || ts <= i.getTimestamp() ? 1 : 0) != 0, (String)i.toString());
        }
        ml.addEntry("msg02".getBytes());
        ml.close();
        iter = ml.getLedgersInfoAsList().iterator();
        ts = -1L;
        while (iter.hasNext()) {
            i = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)iter.next();
            if (iter.hasNext()) {
                Assert.assertTrue((ts <= i.getTimestamp() ? 1 : 0) != 0, (String)i.toString());
                ts = i.getTimestamp();
                continue;
            }
            Assert.assertTrue((i.getTimestamp() > 0L ? 1 : 0) != 0, (String)"well closed LedgerInfo should set a timestamp > 0");
        }
    }

    @Test
    public void testBackwardCompatiblityForMeta() throws Exception {
        final MLDataFormats.ManagedLedgerInfo[] storedMLInfo = new MLDataFormats.ManagedLedgerInfo[3];
        final Stat[] versions = new Stat[1];
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        conf.setRetentionSizeInMB(10L);
        conf.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger ml = this.factory.open("backward_test_ledger", conf);
        ml.openCursor("c1");
        ml.addEntry("msg1".getBytes());
        ml.addEntry("msg2".getBytes());
        ml.close();
        MetaStoreImpl store = new MetaStoreImpl((MetadataStore)this.metadataStore, (OrderedExecutor)this.executor);
        final CountDownLatch l1 = new CountDownLatch(1);
        store.getManagedLedgerInfo("backward_test_ledger", false, (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>(){

            public void operationComplete(MLDataFormats.ManagedLedgerInfo result, Stat version) {
                storedMLInfo[0] = result;
                versions[0] = version;
                l1.countDown();
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                Assert.fail((String)"on get ManagedLedgerInfo backward_test_ledger");
            }
        });
        l1.await();
        MLDataFormats.ManagedLedgerInfo.Builder builder1 = MLDataFormats.ManagedLedgerInfo.newBuilder();
        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo info : storedMLInfo[0].getLedgerInfoList()) {
            MLDataFormats.ManagedLedgerInfo.LedgerInfo noTimestamp = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().mergeFrom(info).clearTimestamp().build();
            Assert.assertFalse((boolean)noTimestamp.hasTimestamp(), (String)"expected old version info with no timestamp");
            builder1.addLedgerInfo(noTimestamp);
        }
        storedMLInfo[1] = builder1.build();
        final CountDownLatch l2 = new CountDownLatch(1);
        store.asyncUpdateLedgerIds("backward_test_ledger", storedMLInfo[1], versions[0], (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<Void>(){

            public void operationComplete(Void result, Stat version) {
                l2.countDown();
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                Assert.fail((String)"on asyncUpdateLedgerIds");
            }
        });
        ManagedLedgerImpl newVersionLedger = (ManagedLedgerImpl)this.factory.open("backward_test_ledger", conf);
        List mlInfo = newVersionLedger.getLedgersInfoAsList();
        Assert.assertTrue((boolean)mlInfo.stream().allMatch(ledgerInfo -> ledgerInfo.hasTimestamp()));
    }

    @Test
    public void testGetPositionAfterN() throws Exception {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxEntriesPerLedger(5);
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.factory.open("testGetPositionAfterN", managedLedgerConfig);
        ManagedCursorImpl managedCursor = (ManagedCursorImpl)managedLedger.openCursor("cursor");
        Position positionMarkDelete = null;
        for (int i = 0; i < 10; ++i) {
            if (i == 3) {
                positionMarkDelete = managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
                continue;
            }
            managedLedger.addEntry(("entry-" + i).getBytes(Encoding));
        }
        managedCursor.markDelete(positionMarkDelete);
        managedLedger.rollCurrentLedgerIfFull();
        Awaitility.await().untilAsserted(() -> Assert.assertEquals((int)managedLedger.getLedgersInfo().size(), (int)3));
        Assert.assertEquals((long)5L, (long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)managedLedger.getLedgersInfoAsList().get(0)).getEntries());
        Assert.assertEquals((long)5L, (long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)managedLedger.getLedgersInfoAsList().get(1)).getEntries());
        Assert.assertEquals((long)0L, (long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)managedLedger.getLedgersInfoAsList().get(2)).getEntries());
        log.info("### ledgers {}", (Object)managedLedger.getLedgersInfo());
        long firstLedger = (Long)managedLedger.getLedgersInfo().firstKey();
        long secondLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)managedLedger.getLedgersInfoAsList().get(1)).getLedgerId();
        PositionImpl startPosition = new PositionImpl(firstLedger, 0L);
        PositionImpl targetPosition = managedLedger.getPositionAfterN(startPosition, 1L, ManagedLedgerImpl.PositionBound.startExcluded);
        Assert.assertEquals((long)targetPosition.getLedgerId(), (long)firstLedger);
        Assert.assertEquals((long)targetPosition.getEntryId(), (long)1L);
        targetPosition = managedLedger.getPositionAfterN(startPosition, 4L, ManagedLedgerImpl.PositionBound.startExcluded);
        Assert.assertEquals((long)targetPosition.getLedgerId(), (long)firstLedger);
        Assert.assertEquals((long)targetPosition.getEntryId(), (long)4L);
        PositionImpl searchPosition = managedLedger.getNextValidPosition((PositionImpl)managedCursor.getMarkDeletedPosition());
        long length = managedCursor.getNumberOfEntriesInStorage();
        targetPosition = managedLedger.getPositionAfterN(searchPosition, length, ManagedLedgerImpl.PositionBound.startExcluded);
        log.info("Target position is {}", (Object)targetPosition);
        Assert.assertEquals((long)targetPosition.getLedgerId(), (long)secondLedger);
        Assert.assertEquals((long)targetPosition.getEntryId(), (long)4L);
    }

    @Test
    public void testEstimatedBacklogSize() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testEstimatedBacklogSize");
        ManagedCursor c1 = ledger.openCursor("c1");
        ledger.addEntry(new byte[1024]);
        Position position2 = ledger.addEntry(new byte[1024]);
        ledger.addEntry(new byte[1024]);
        ledger.addEntry(new byte[1024]);
        Position lastPosition = ledger.addEntry(new byte[1024]);
        long backlog = ledger.getEstimatedBacklogSize();
        Assert.assertEquals((long)backlog, (long)5120L);
        List entries = c1.readEntries(2);
        entries.forEach(Entry::release);
        c1.markDelete(position2);
        backlog = ledger.getEstimatedBacklogSize();
        Assert.assertEquals((long)backlog, (long)3072L);
        entries = c1.readEntries(3);
        entries.forEach(Entry::release);
        c1.markDelete(lastPosition);
        backlog = ledger.getEstimatedBacklogSize();
        Assert.assertEquals((long)backlog, (long)0L);
    }

    @Test
    public void testGetNextValidPosition() throws Exception {
        ManagedLedgerConfig conf = new ManagedLedgerConfig();
        conf.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testGetNextValidPosition", conf);
        ManagedCursor c1 = ledger.openCursor("c1");
        PositionImpl p1 = (PositionImpl)ledger.addEntry("entry1".getBytes());
        PositionImpl p2 = (PositionImpl)ledger.addEntry("entry2".getBytes());
        PositionImpl p3 = (PositionImpl)ledger.addEntry("entry3".getBytes());
        Assert.assertEquals((Object)ledger.getNextValidPosition((PositionImpl)c1.getMarkDeletedPosition()), (Object)p1);
        Assert.assertEquals((Object)ledger.getNextValidPosition(p1), (Object)p2);
        Assert.assertEquals((Object)ledger.getNextValidPosition(p3), (Object)PositionImpl.get((long)p3.getLedgerId(), (long)(p3.getEntryId() + 1L)));
        Assert.assertEquals((Object)ledger.getNextValidPosition(PositionImpl.get((long)p3.getLedgerId(), (long)(p3.getEntryId() + 1L))), (Object)PositionImpl.get((long)p3.getLedgerId(), (long)(p3.getEntryId() + 1L)));
        Assert.assertEquals((Object)ledger.getNextValidPosition(PositionImpl.get((long)(p3.getLedgerId() + 1L), (long)(p3.getEntryId() + 1L))), (Object)PositionImpl.get((long)p3.getLedgerId(), (long)(p3.getEntryId() + 1L)));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testActiveDeactiveCursorWithDiscardEntriesFromCache() throws Exception {
        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
        conf.setCacheEvictionFrequency(0.1);
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc, conf);
        try {
            ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("cache_eviction_ledger");
            ManagedCursor cursor1 = ledger.openCursor("c1");
            ManagedCursor cursor2 = ledger.openCursor("c2");
            HashSet activeCursors = Sets.newHashSet();
            activeCursors.add(cursor1);
            activeCursors.add(cursor2);
            Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
            cacheField.setAccessible(true);
            EntryCacheImpl entryCache = (EntryCacheImpl)cacheField.get(ledger);
            Iterator activeCursor = ledger.getActiveCursors().iterator();
            activeCursors.remove(activeCursor.next());
            activeCursors.remove(activeCursor.next());
            Assert.assertTrue((boolean)activeCursors.isEmpty());
            Assert.assertFalse((boolean)activeCursor.hasNext());
            int totalInsertedEntries = 50;
            for (int i = 0; i < 50; ++i) {
                String content = "entry";
                ledger.addEntry(content.getBytes());
            }
            Assert.assertEquals((long)250L, (long)entryCache.getSize());
            int readEntries = 20;
            List entries1 = cursor1.readEntries(20);
            cursor1.markDelete(((Entry)entries1.get(entries1.size() - 1)).getPosition());
            for (Object entry : entries1) {
                log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
                entry.release();
            }
            Thread.sleep(1000L);
            List entries2 = cursor2.readEntries(20);
            cursor2.markDelete(((Entry)entries2.get(entries2.size() - 1)).getPosition());
            for (Entry entry : entries2) {
                log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
                entry.release();
            }
            log.info("expected, found : {}, {}", (Object)250, (Object)entryCache.getSize());
            Assert.assertEquals((long)250L, (long)entryCache.getSize());
            int remainingEntries = 30;
            entries1 = cursor1.readEntries(30);
            cursor1.markDelete(((Entry)entries1.get(entries1.size() - 1)).getPosition());
            for (Entry entry : entries1) {
                log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
                entry.release();
            }
            Assert.assertEquals((long)250L, (long)entryCache.getSize());
            ledger.deactivateCursor(cursor1);
            ledger.deactivateCursor(cursor2);
            Assert.assertEquals((long)entryCache.getSize(), (long)0L);
            log.info("Finished reading entries");
            ledger.close();
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    @Test
    public void testActiveDeactiveCursor() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("cache_eviction_ledger");
        Field cacheField = ManagedLedgerImpl.class.getDeclaredField("entryCache");
        cacheField.setAccessible(true);
        EntryCacheImpl entryCache = (EntryCacheImpl)cacheField.get(ledger);
        int totalInsertedEntries = 20;
        for (int i = 0; i < 20; ++i) {
            String content = "entry";
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((long)0L, (long)entryCache.getSize());
        ManagedCursor cursor1 = ledger.openCursor("c1");
        ManagedCursor cursor2 = ledger.openCursor("c2");
        ledger.deactivateCursor(cursor2);
        for (int i = 0; i < 20; ++i) {
            String content = "entry";
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((long)100L, (long)entryCache.getSize());
        List entries1 = cursor1.readEntries(20);
        for (Entry entry : entries1) {
            log.info("Read entry. Position={} Content='{}'", (Object)entry.getPosition(), (Object)new String(entry.getData()));
            entry.release();
        }
        ledger.deactivateCursor(cursor1);
        Assert.assertEquals((long)0L, (long)entryCache.getSize());
        ledger.close();
    }

    @Test
    public void testCursorRecoveryForEmptyLedgers() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testCursorRecoveryForEmptyLedgers");
        ManagedCursor c1 = ledger.openCursor("c1");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)ledger.lastConfirmedEntry);
        c1.close();
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("testCursorRecoveryForEmptyLedgers");
        c1 = ledger.openCursor("c1");
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Assert.assertEquals((Object)c1.getMarkDeletedPosition(), (Object)ledger.lastConfirmedEntry);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLazyRecoverCursor() throws Exception {
        ManagedLedger ledger = this.factory.open("testLedger");
        ManagedCursor cursor = ledger.openCursor("testCursor");
        ledger.addEntry("entry-1".getBytes());
        Position p1 = ledger.addEntry("entry-2".getBytes());
        cursor.markDelete(p1);
        ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        try {
            CompletableFuture future = this.bkc.promiseAfter(2);
            ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DefaultThreadFactory("lazyCursorRecovery"));
            try {
                scheduledExecutorService.schedule(() -> future.complete(null), 10L, TimeUnit.SECONDS);
                ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
                managedLedgerConfig.setLazyCursorRecovery(true);
                Long startLedgerRecovery = System.currentTimeMillis();
                ledger = factory2.open("testLedger", managedLedgerConfig);
                Assert.assertTrue((System.currentTimeMillis() - startLedgerRecovery < 5000L ? 1 : 0) != 0);
                cursor = ledger.openCursor("testCursor");
                Assert.assertEquals((Object)cursor.getMarkDeletedPosition(), (Object)p1);
            }
            finally {
                if (Collections.singletonList(scheduledExecutorService).get(0) != null) {
                    scheduledExecutorService.shutdownNow();
                }
            }
        }
        finally {
            if (Collections.singletonList(factory2).get(0) != null) {
                factory2.shutdown();
            }
        }
    }

    @Test
    public void testConcurrentOpenCursor() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testConcurrentOpenCursor");
        final AtomicReference<Object> cursor1 = new AtomicReference<Object>(null);
        final AtomicReference<Object> cursor2 = new AtomicReference<Object>(null);
        CyclicBarrier barrier = new CyclicBarrier(2);
        final CountDownLatch latch = new CountDownLatch(2);
        this.cachedExecutor.execute(() -> {
            try {
                barrier.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            ledger.asyncOpenCursor("c1", new AsyncCallbacks.OpenCursorCallback(){

                public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                }

                public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                    cursor1.set(cursor);
                    latch.countDown();
                }
            }, null);
        });
        this.cachedExecutor.execute(() -> {
            try {
                barrier.await();
            }
            catch (Exception exception) {
                // empty catch block
            }
            ledger.asyncOpenCursor("c1", new AsyncCallbacks.OpenCursorCallback(){

                public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                    latch.countDown();
                }

                public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                    cursor2.set(cursor);
                    latch.countDown();
                }
            }, null);
        });
        latch.await();
        Assert.assertNotNull(cursor1.get());
        Assert.assertNotNull(cursor2.get());
        Assert.assertEquals(cursor1.get(), cursor2.get());
        ledger.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testConcurrentOpenCursorShouldNotHaveConcurrentAccessOfUninitializedCursors() throws Exception {
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("ConcurrentAccessOfUninitializedCursors");
        final CompletableFuture cursorFuture = new CompletableFuture();
        CompletableFuture removingFuture = new CompletableFuture();
        CompletableFuture concurrentAccessFuture = new CompletableFuture();
        TimeoutException concurrentAccessTimeout = new TimeoutException();
        this.cachedExecutor.execute(() -> {
            removingFuture.join();
            CompletableFuture<Object> lockingFuture = new CompletableFuture<Object>();
            this.cachedExecutor.execute(() -> {
                try {
                    lockingFuture.join();
                    Thread.sleep(2L);
                    concurrentAccessFuture.completeExceptionally(concurrentAccessTimeout);
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                }
            });
            lockingFuture.complete(null);
            ManagedLedgerImpl managedLedgerImpl = ledger;
            synchronized (managedLedgerImpl) {
                concurrentAccessFuture.complete(null);
            }
        });
        Map uninitializedCursors = ledger.uninitializedCursors;
        Map spyUninitializedCursors = (Map)Mockito.spy((Object)uninitializedCursors);
        ((Map)Mockito.doAnswer(mock -> {
            removingFuture.complete(null);
            try {
                concurrentAccessFuture.get();
                IllegalStateException throwable = new IllegalStateException("Detecting concurrent access of uninitializedCursors");
                cursorFuture.completeExceptionally(throwable);
            }
            catch (Exception ex) {
                Assert.assertSame((Object)ExceptionUtils.getRootCause((Throwable)ex), (Object)concurrentAccessTimeout);
            }
            return mock.callRealMethod();
        }).when((Object)spyUninitializedCursors)).remove(ArgumentMatchers.anyString());
        this.setFieldValue(ManagedLedgerImpl.class, ledger, "uninitializedCursors", spyUninitializedCursors);
        this.cachedExecutor.execute(() -> {
            try {
                ledger.asyncOpenCursor("c1", new AsyncCallbacks.OpenCursorCallback(){

                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                        cursorFuture.completeExceptionally(exception);
                    }

                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                        cursorFuture.complete(cursor);
                    }
                }, null);
            }
            catch (Exception e) {
                cursorFuture.completeExceptionally(e);
            }
        });
        try {
            ManagedCursor cursor = (ManagedCursor)cursorFuture.get();
            Assert.assertNotNull((Object)cursor);
        }
        catch (Exception ex) {
            Assert.fail((String)ExceptionUtils.getRootCauseMessage((Throwable)ex));
        }
        finally {
            ledger.close();
        }
    }

    @Test
    public void testConsumerSubscriptionInitializePosition() throws Exception {
        int MAX_ENTRY_PER_LEDGER = 2;
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMaxEntriesPerLedger(2);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("lastest_earliest_ledger", config);
        int totalInsertedEntries = 20;
        for (int i = 0; i < 20; ++i) {
            String content = "entry" + i;
            ledger.addEntry(content.getBytes());
        }
        ManagedCursor latestCursor = ledger.openCursor("c1", CommandSubscribe.InitialPosition.Latest);
        ManagedCursor earliestCursor = ledger.openCursor("c2", CommandSubscribe.InitialPosition.Earliest);
        PositionImpl p1 = (PositionImpl)latestCursor.getReadPosition();
        PositionImpl p2 = (PositionImpl)earliestCursor.getReadPosition();
        Pair latestPositionAndCounter = ledger.getLastPositionAndCounter();
        Pair earliestPositionAndCounter = ledger.getFirstPositionAndCounter();
        Assert.assertEquals((Object)((PositionImpl)latestPositionAndCounter.getLeft()).getNext(), (Object)p1);
        Assert.assertEquals((Object)((PositionImpl)earliestPositionAndCounter.getLeft()).getNext(), (Object)p2);
        Assert.assertEquals((long)((Long)latestPositionAndCounter.getRight()), (long)20L);
        Assert.assertEquals((long)((Long)earliestPositionAndCounter.getRight()), (long)(20L - earliestCursor.getNumberOfEntriesInBacklog(false)));
        ledger.close();
    }

    @Test
    public void testManagedLedgerAutoCreate() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(true);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("test", config);
        Assert.assertNotNull((Object)ledger);
    }

    @Test
    public void testManagedLedgerWithoutAutoCreate() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setCreateIfMissing(false);
        try {
            this.factory.open("testManagedLedgerWithoutAutoCreate", config);
            Assert.fail((String)"should have thrown ManagedLedgerNotFoundException");
        }
        catch (ManagedLedgerException.ManagedLedgerNotFoundException managedLedgerNotFoundException) {
            // empty catch block
        }
        Assert.assertFalse((boolean)this.factory.getManagedLedgers().containsKey("testManagedLedgerWithoutAutoCreate"));
    }

    @Test
    public void testManagedLedgerWithCreateLedgerTimeOut() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setMetadataOperationsTimeoutSeconds(3L);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("timeout_ledger_test", config);
        BookKeeper bk = (BookKeeper)Mockito.mock(BookKeeper.class);
        ((BookKeeper)Mockito.doNothing().when((Object)bk)).asyncCreateLedger(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (BookKeeper.DigestType)ArgumentMatchers.any(), (byte[])ArgumentMatchers.any(), (AsyncCallback.CreateCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
        AtomicInteger response = new AtomicInteger(0);
        CountDownLatch latch = new CountDownLatch(1);
        AtomicReference ctxHolder = new AtomicReference();
        ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
            response.set(rc);
            latch.countDown();
            ctxHolder.set(ctx);
        }, Collections.emptyMap());
        latch.await(config.getMetadataOperationsTimeoutSeconds() + 2L, TimeUnit.SECONDS);
        Assert.assertEquals((int)response.get(), (int)-23);
        Assert.assertTrue((boolean)(ctxHolder.get() instanceof AtomicBoolean));
        AtomicBoolean ledgerCreated = (AtomicBoolean)ctxHolder.get();
        Assert.assertFalse((boolean)ledgerCreated.get());
        ledger.close();
    }

    @Test
    public void testManagedLedgerWithReadEntryTimeOut() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setReadEntryTimeoutSeconds(1L);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("timeout_ledger_test", config);
        BookKeeper bk = (BookKeeper)Mockito.mock(BookKeeper.class);
        ((BookKeeper)Mockito.doNothing().when((Object)bk)).asyncCreateLedger(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (BookKeeper.DigestType)ArgumentMatchers.any(), (byte[])ArgumentMatchers.any(), (AsyncCallback.CreateCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
        final AtomicReference responseException1 = new AtomicReference();
        final String ctxStr = "timeoutCtx";
        CompletableFuture entriesFuture = new CompletableFuture();
        ReadHandle ledgerHandle = (ReadHandle)Mockito.mock(ReadHandle.class);
        ((ReadHandle)Mockito.doReturn(entriesFuture).when((Object)ledgerHandle)).readAsync(PositionImpl.earliest.getLedgerId(), PositionImpl.earliest.getEntryId());
        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                responseException1.set(null);
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                Assert.assertEquals((String)ctxStr, (String)((String)ctx));
                responseException1.set(exception);
            }
        }, (Object)ctxStr);
        ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {}, Collections.emptyMap());
        ManagedLedgerTest.retryStrategically(test -> responseException1.get() != null, 5, 1000L);
        Assert.assertNotNull(responseException1.get());
        Assert.assertEquals((String)((ManagedLedgerException)((Object)responseException1.get())).getMessage(), (String)BKException.getMessage((int)-23));
        final AtomicReference responseException2 = new AtomicReference();
        PositionImpl readPositionRef = PositionImpl.earliest;
        ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
        OpReadEntry opReadEntry = OpReadEntry.create((ManagedCursorImpl)cursor, (PositionImpl)readPositionRef, (int)1, (AsyncCallbacks.ReadEntriesCallback)new AsyncCallbacks.ReadEntriesCallback(){

            public void readEntriesComplete(List<Entry> entries, Object ctx) {
            }

            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                Assert.assertEquals((String)ctxStr, (String)((String)ctx));
                responseException2.set(exception);
            }
        }, null, (PositionImpl)PositionImpl.latest);
        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(), false, opReadEntry, (Object)ctxStr);
        ManagedLedgerTest.retryStrategically(test -> responseException2.get() != null, 5, 1000L);
        Assert.assertNotNull(responseException2.get());
        Assert.assertEquals((String)((ManagedLedgerException)((Object)responseException2.get())).getMessage(), (String)BKException.getMessage((int)-23));
        ledger.close();
    }

    @Test(timeOut=20000L)
    public void testManagedLedgerWithAddEntryTimeOut() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig().setAddEntryTimeoutSeconds(1L);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("timeout_ledger_test", config);
        BookKeeper bk = (BookKeeper)Mockito.mock(BookKeeper.class);
        ((BookKeeper)Mockito.doNothing().when((Object)bk)).asyncCreateLedger(ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), ArgumentMatchers.anyInt(), (BookKeeper.DigestType)ArgumentMatchers.any(), (byte[])ArgumentMatchers.any(), (AsyncCallback.CreateCallback)ArgumentMatchers.any(), ArgumentMatchers.any(), (Map)ArgumentMatchers.any());
        PulsarMockBookKeeper bkClient = (PulsarMockBookKeeper)Mockito.mock(PulsarMockBookKeeper.class);
        ClientConfiguration conf = new ClientConfiguration();
        ((PulsarMockBookKeeper)Mockito.doReturn((Object)conf).when((Object)bkClient)).getConf();
        class MockLedgerHandle
        extends PulsarMockLedgerHandle {
            public MockLedgerHandle(PulsarMockBookKeeper bk, long id, BookKeeper.DigestType digest, byte[] passwd) throws GeneralSecurityException {
                super(bk, id, digest, passwd);
            }

            public void asyncAddEntry(byte[] data, AsyncCallback.AddCallback cb, Object ctx) {
            }

            public void asyncClose(AsyncCallback.CloseCallback cb, Object ctx) {
                cb.closeComplete(0, (LedgerHandle)this, ctx);
            }
        }
        MockLedgerHandle ledgerHandle = (MockLedgerHandle)((Object)Mockito.mock(MockLedgerHandle.class));
        String data = "data";
        ((MockLedgerHandle)((Object)Mockito.doNothing().when((Object)ledgerHandle))).asyncAddEntry("data".getBytes(), null, null);
        final AtomicBoolean addSuccess = new AtomicBoolean();
        this.setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", (Object)ledgerHandle);
        boolean totalAddEntries = true;
        final CountDownLatch latch = new CountDownLatch(1);
        ledger.asyncAddEntry("data".getBytes(), new AsyncCallbacks.AddEntryCallback(){

            public void addComplete(Position position, ByteBuf entryData, Object ctx) {
                addSuccess.set(true);
                latch.countDown();
            }

            public void addFailed(ManagedLedgerException exception, Object ctx) {
                latch.countDown();
            }
        }, null);
        latch.await();
        Assert.assertTrue((boolean)addSuccess.get());
        this.setFieldValue(ManagedLedgerImpl.class, ledger, "currentLedger", null);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void avoidUseSameOpAddEntryBetweenDifferentLedger() throws Exception {
        ManagedLedgerFactoryConfig config = new ManagedLedgerFactoryConfig();
        config.setMaxCacheSize(0L);
        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc, config);
        try {
            int i;
            ManagedLedgerImpl ledger = (ManagedLedgerImpl)factory.open("my_test_ledger");
            ArrayList<OpAddEntry> oldOps = new ArrayList<OpAddEntry>();
            for (i = 0; i < 10; ++i) {
                OpAddEntry op = OpAddEntry.create((ManagedLedgerImpl)ledger, (ByteBuf)ByteBufAllocator.DEFAULT.buffer(128), null, null);
                if (i > 4) {
                    op.setLedger((LedgerHandle)Mockito.mock(LedgerHandle.class));
                }
                oldOps.add(op);
                ledger.pendingAddEntries.add(op);
            }
            ledger.updateLedgersIdsComplete((Stat)Mockito.mock(Stat.class));
            for (i = 0; i < 10; ++i) {
                OpAddEntry oldOp = (OpAddEntry)oldOps.get(i);
                if (i > 4) {
                    Assert.assertEquals((Object)oldOp.getState(), (Object)OpAddEntry.State.CLOSED);
                } else {
                    Assert.assertEquals((Object)oldOp.getState(), (Object)OpAddEntry.State.INITIATED);
                }
                OpAddEntry newOp = (OpAddEntry)ledger.pendingAddEntries.poll();
                Assert.assertEquals((Object)newOp.getState(), (Object)OpAddEntry.State.INITIATED);
                if (i > 4) {
                    Assert.assertNotSame((Object)oldOp, (Object)newOp);
                    continue;
                }
                Assert.assertSame((Object)oldOp, (Object)newOp);
            }
        }
        finally {
            if (Collections.singletonList(factory).get(0) != null) {
                factory.shutdown();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(dataProvider="checkOwnershipFlag")
    public void recoverMLWithBadVersion(boolean checkOwnershipFlag) throws Exception {
        ManagedLedgerFactoryConfig conf = new ManagedLedgerFactoryConfig();
        ManagedLedgerFactoryImpl factory1 = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc, conf);
        try {
            ManagedLedgerFactoryImpl factory2 = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc, conf);
            try {
                MutableObject ledger1 = new MutableObject();
                MutableObject ledger2 = new MutableObject();
                MutableObject cursor1 = new MutableObject();
                MutableObject cursor2 = new MutableObject();
                this.createLedger(factory1, (MutableObject<ManagedLedger>)ledger1, (MutableObject<ManagedCursorImpl>)cursor1, checkOwnershipFlag);
                ((ManagedLedger)ledger1.getValue()).addEntry("test1".getBytes(Encoding));
                ((ManagedLedger)ledger1.getValue()).addEntry("test2".getBytes(Encoding));
                Entry entry = (Entry)((ManagedCursorImpl)cursor1.getValue()).readEntries(1).get(0);
                ((ManagedCursorImpl)cursor1.getValue()).delete(entry.getPosition());
                this.createLedger(factory2, (MutableObject<ManagedLedger>)ledger2, (MutableObject<ManagedCursorImpl>)cursor2, checkOwnershipFlag);
                entry = (Entry)((ManagedCursorImpl)cursor2.getValue()).readEntries(1).get(0);
                ((ManagedCursorImpl)cursor1.getValue()).close();
                boolean isFailed = this.updateCusorMetadataByCreatingMetadataLedger((MutableObject<ManagedCursorImpl>)cursor2);
                Assert.assertTrue((boolean)isFailed);
                isFailed = this.updateCusorMetadataByCreatingMetadataLedger((MutableObject<ManagedCursorImpl>)cursor2);
                if (checkOwnershipFlag) {
                    Assert.assertFalse((boolean)isFailed);
                } else {
                    Assert.assertTrue((boolean)isFailed);
                }
                log.info("Test completed");
            }
            finally {
                if (Collections.singletonList(factory2).get(0) != null) {
                    factory2.shutdown();
                }
            }
        }
        finally {
            if (Collections.singletonList(factory1).get(0) != null) {
                factory1.shutdown();
            }
        }
    }

    private boolean updateCusorMetadataByCreatingMetadataLedger(MutableObject<ManagedCursorImpl> cursor2) throws InterruptedException {
        final MutableObject failed = new MutableObject();
        failed.setValue((Object)false);
        final CountDownLatch createLedgerDoneLatch = new CountDownLatch(1);
        ((ManagedCursorImpl)cursor2.getValue()).createNewMetadataLedger(new ManagedCursorImpl.VoidCallback(){

            public void operationComplete() {
                createLedgerDoneLatch.countDown();
            }

            public void operationFailed(ManagedLedgerException exception) {
                failed.setValue((Object)true);
                createLedgerDoneLatch.countDown();
            }
        });
        createLedgerDoneLatch.await();
        return (Boolean)failed.getValue();
    }

    @Test
    public void testPropertiesForMeta() throws Exception {
        String mLName = "properties_test";
        this.factory.open("properties_test");
        MetaStoreImpl store = new MetaStoreImpl((MetadataStore)this.metadataStore, (OrderedExecutor)this.executor);
        MLDataFormats.ManagedLedgerInfo.Builder builder = MLDataFormats.ManagedLedgerInfo.newBuilder();
        builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key1").setValue("value1").build());
        builder.addProperties(MLDataFormats.KeyValue.newBuilder().setKey("key2").setValue("value2").build());
        final CountDownLatch l2 = new CountDownLatch(1);
        store.asyncUpdateLedgerIds("properties_test", builder.build(), new Stat("properties_test", 1L, 0L, 0L, false, true), (MetaStore.MetaStoreCallback)new MetaStore.MetaStoreCallback<Void>(){

            public void operationComplete(Void result, Stat version) {
                l2.countDown();
            }

            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                Assert.fail((String)"on asyncUpdateLedgerIds");
            }
        });
        ManagedLedgerInfo managedLedgerInfo = this.factory.getManagedLedgerInfo("properties_test");
        Map properties = managedLedgerInfo.properties;
        Assert.assertEquals((String)((String)properties.get("key1")), (String)"value1");
        Assert.assertEquals((String)((String)properties.get("key2")), (String)"value2");
        this.factory.shutdown();
        this.factory = new ManagedLedgerFactoryImpl((MetadataStoreExtended)this.metadataStore, (BookKeeper)this.bkc);
        ManagedLedger ml = this.factory.open("properties_test");
        properties = ml.getProperties();
        Assert.assertEquals((String)((String)properties.get("key1")), (String)"value1");
        Assert.assertEquals((String)((String)properties.get("key2")), (String)"value2");
    }

    private void createLedger(ManagedLedgerFactoryImpl factory, final MutableObject<ManagedLedger> ledger1, final MutableObject<ManagedCursorImpl> cursor1, boolean checkOwnershipFlag) throws Exception {
        final CountDownLatch latch = new CountDownLatch(1);
        factory.asyncOpen("my_test_ledger", new ManagedLedgerConfig(), new AsyncCallbacks.OpenLedgerCallback(){

            public void openLedgerComplete(ManagedLedger ledger, Object ctx) {
                ledger1.setValue((Object)ledger);
                ledger.asyncOpenCursor("test-cursor", new AsyncCallbacks.OpenCursorCallback(){

                    public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                        cursor1.setValue((Object)((ManagedCursorImpl)cursor));
                        latch.countDown();
                    }

                    public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
                    }
                }, null);
            }

            public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
            }
        }, checkOwnershipFlag ? () -> true : null, null);
        latch.await();
    }

    @Test
    public void deleteWithoutOpen() throws Exception {
        ManagedLedger ledger = this.factory.open("my_test_ledger");
        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
        Assert.assertEquals((long)ledger.getNumberOfEntries(), (long)1L);
        ledger.close();
        this.factory.delete("my_test_ledger");
        try {
            this.factory.open("my_test_ledger", new ManagedLedgerConfig().setCreateIfMissing(false));
            Assert.fail((String)"Should have failed");
        }
        catch (ManagedLedgerException.ManagedLedgerNotFoundException managedLedgerNotFoundException) {
            // empty catch block
        }
    }

    @Test(timeOut=10000L)
    public void testManagedLedgerWithPlacementPolicyInCustomMetadata() throws Exception {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyClassName(MockedPlacementPolicy.class);
        managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(Collections.singletonMap("key", "value"));
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", managedLedgerConfig);
        Assert.assertFalse((boolean)ledger.createdLedgerCustomMetadata.isEmpty());
        byte[] configData = (byte[])ledger.createdLedgerCustomMetadata.get("EnsemblePlacementPolicyConfig");
        EnsemblePlacementPolicyConfig config = EnsemblePlacementPolicyConfig.decode((byte[])configData);
        Assert.assertEquals((String)config.getPolicyClass().getName(), (String)MockedPlacementPolicy.class.getName());
        Assert.assertEquals((int)config.getProperties().size(), (int)1);
        Assert.assertTrue((boolean)config.getProperties().containsKey("key"));
        Assert.assertEquals(config.getProperties().get("key"), (Object)"value");
    }

    private void setFieldValue(Class clazz, Object classObj, String fieldName, Object fieldValue) throws Exception {
        Field field = clazz.getDeclaredField(fieldName);
        field.setAccessible(true);
        field.set(classObj, fieldValue);
    }

    public static void retryStrategically(Predicate<Void> predicate, int retryCount, long intSleepTimeInMillis) throws Exception {
        for (int i = 0; i < retryCount && !predicate.test(null) && i != retryCount - 1; ++i) {
            Thread.sleep(intSleepTimeInMillis + intSleepTimeInMillis * (long)i);
        }
    }

    @Test
    public void testManagedLedgerRollOverIfFull() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionTime(1, TimeUnit.SECONDS);
        config.setMaxEntriesPerLedger(2);
        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
        config.setMaximumRolloverTime(500, TimeUnit.MILLISECONDS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("test_managedLedger_rollOver", config);
        ManagedCursor cursor = ledger.openCursor("c1");
        int msgNum = 10;
        for (int i = 0; i < msgNum; ++i) {
            ledger.addEntry(new byte[0x100000]);
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)(msgNum / 2));
        List entries = cursor.readEntries(msgNum);
        Assert.assertEquals((int)msgNum, (int)entries.size());
        for (Entry entry : entries) {
            cursor.markDelete(entry.getPosition());
        }
        entries.forEach(e -> e.release());
        Thread.sleep(1000L);
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Assert.assertEquals((long)ledger.getTotalSize(), (long)0L);
    }

    @Test
    public void testLedgerReachMaximumRolloverTime() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
        config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
        ManagedLedger ml = this.factory.open("ledger-reach-maximum-rollover-time", config);
        long firstLedgerId = ml.addEntry("test".getBytes()).getLedgerId();
        Awaitility.await().until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId());
    }

    @Test
    public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionTime(1, TimeUnit.SECONDS);
        config.setMaxEntriesPerLedger(2);
        config.setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
        config.setMaximumRolloverTime(500, TimeUnit.MILLISECONDS);
        ManagedLedgerImpl managedLedger = (ManagedLedgerImpl)this.factory.open("ml_restart_ledger", config);
        ManagedCursor cursor = managedLedger.openCursor("c1");
        for (int i = 0; i < 3; ++i) {
            managedLedger.addEntry(new byte[0x100000]);
        }
        Assert.assertEquals((int)managedLedger.getLedgersInfoAsList().size(), (int)2);
        List entries = cursor.readEntries(3);
        managedLedger.close();
        managedLedger = (ManagedLedgerImpl)this.factory.open("ml_restart_ledger", config);
        Assert.assertTrue((managedLedger.getLedgersInfoAsList().size() >= 2 ? 1 : 0) != 0);
        cursor = managedLedger.openCursor("c1");
        for (Entry entry : entries) {
            cursor.markDelete(entry.getPosition());
        }
        entries.forEach(Entry::release);
        managedLedger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
        managedLedger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
        ManagedLedgerImpl finalManagedLedger = managedLedger;
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)finalManagedLedger.getLedgersInfoAsList().size(), (int)1);
            Assert.assertEquals((long)finalManagedLedger.getTotalSize(), (long)0L);
        });
    }

    @Test(timeOut=20000L)
    public void testAsyncTruncateLedgerRetention() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setRetentionSizeInMB(50L);
        config.setRetentionTime(1, TimeUnit.DAYS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("truncate_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test-cursor");
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.close();
        ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)this.factory.open("truncate_ledger", config);
        ledger2.addEntry("test-entry-2".getBytes(Encoding));
        CompletableFuture future = ledger2.asyncTruncate();
        future.get();
        Assert.assertTrue((ledger2.getLedgersInfoAsList().size() <= 1 ? 1 : 0) != 0);
    }

    @Test(timeOut=20000L)
    public void testAsyncTruncateLedgerSlowestCursor() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("truncate_ledger", config);
        ManagedCursor cursor = ledger.openCursor("test-cursor");
        ManagedCursor cursor2 = ledger.openCursor("test-cursor2");
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.addEntry("test-entry-1".getBytes(Encoding));
        ledger.close();
        ManagedLedgerImpl ledger2 = (ManagedLedgerImpl)this.factory.open("truncate_ledger", config);
        ledger2.addEntry("test-entry-2".getBytes(Encoding));
        ManagedCursor cursor3 = ledger2.openCursor("test-cursor");
        cursor3.resetCursor((Position)new PositionImpl(ledger2.getLastPosition()));
        CompletableFuture future = ledger2.asyncTruncate();
        future.get();
        Assert.assertTrue((ledger2.getLedgersInfoAsList().size() == 1 ? 1 : 0) != 0);
    }

    @Test
    public void testOpEntryAdd_toString_doesNotThrowNPE() {
        ManagedLedger ml = (ManagedLedger)Mockito.mock(ManagedLedger.class);
        LedgerHandle ledger = (LedgerHandle)Mockito.mock(LedgerHandle.class);
        Mockito.when((Object)ml.getName()).thenReturn(null);
        Mockito.when((Object)ledger.getId()).thenReturn((Object)124L);
        long entryId = 12L;
        long startTime = 1245L;
        int dataLength = 566;
        String test = "OpAddEntry{mlName=" + ml != null ? ml.getName() : ("null, ledgerId=" + ledger != null ? String.valueOf(ledger.getId()) : "null, entryId=" + entryId + ", startTime=" + startTime + ", dataLength=" + dataLength + '}');
    }

    @Test
    public void testInvalidateReadHandleWhenDeleteLedger() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testInvalidateReadHandleWhenDeleteLedger", config);
        ManagedCursor cursor = ledger.openCursor("test-cursor");
        ManagedCursor cursor2 = ledger.openCursor("test-cursor2");
        int entries = 3;
        for (int i = 0; i < 3; ++i) {
            ledger.addEntry(String.valueOf(i).getBytes(Encoding));
        }
        List entryList = cursor.readEntries(3);
        Assert.assertEquals((int)entryList.size(), (int)3);
        Assert.assertEquals((int)ledger.ledgers.size(), (int)3);
        Assert.assertEquals((long)ledger.ledgerCache.size(), (long)2L);
        cursor.clearBacklog();
        cursor2.clearBacklog();
        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)ledger.ledgers.size(), (int)1);
            Assert.assertEquals((long)ledger.ledgerCache.size(), (long)0L);
        });
        cursor.close();
        cursor2.close();
        ledger.close();
    }

    @Test
    public void testInvalidateReadHandleWhenConsumed() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(1);
        config.setRetentionSizeInMB(50L);
        config.setRetentionTime(1, TimeUnit.DAYS);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testInvalidateReadHandleWhenConsumed", config);
        ManagedCursor cursor = ledger.openCursor("test-cursor");
        ManagedCursor cursor2 = ledger.openCursor("test-cursor2");
        int entries = 3;
        for (int i = 0; i < 3; ++i) {
            ledger.addEntry(String.valueOf(i).getBytes(Encoding));
        }
        List entryList = cursor.readEntries(3);
        Assert.assertEquals((int)entryList.size(), (int)3);
        Assert.assertEquals((int)ledger.ledgers.size(), (int)3);
        Assert.assertEquals((long)ledger.ledgerCache.size(), (long)2L);
        cursor.clearBacklog();
        cursor2.clearBacklog();
        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)ledger.ledgers.size(), (int)3);
            Assert.assertEquals((long)ledger.ledgerCache.size(), (long)0L);
        });
        ManagedCursor cursor3 = ledger.openCursor("test-cursor3", CommandSubscribe.InitialPosition.Earliest);
        entryList = cursor3.readEntries(3);
        Assert.assertEquals((int)entryList.size(), (int)3);
        Assert.assertEquals((long)ledger.ledgerCache.size(), (long)2L);
        cursor3.clearBacklog();
        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
        Awaitility.await().untilAsserted(() -> {
            Assert.assertEquals((int)ledger.ledgers.size(), (int)3);
            Assert.assertEquals((long)ledger.ledgerCache.size(), (long)0L);
        });
        cursor.close();
        cursor2.close();
        cursor3.close();
        ledger.close();
    }

    @Test
    public void testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers() throws Exception {
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(1);
        config.setMaxSizePerLedgerMb(1);
        LedgerOffloader ledgerOffloader = (LedgerOffloader)Mockito.mock(NullLedgerOffloader.class);
        OffloadPoliciesImpl offloadPolicies = (OffloadPoliciesImpl)Mockito.mock(OffloadPoliciesImpl.class);
        Mockito.when((Object)ledgerOffloader.getOffloadPolicies()).thenReturn((Object)offloadPolicies);
        Mockito.when((Object)ledgerOffloader.getOffloadDriverName()).thenReturn((Object)"s3");
        config.setLedgerOffloader(ledgerOffloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("testDoNotGetOffloadPoliciesMultipleTimesWhenTrimLedgers", config);
        ledger.openCursor("test-cursor");
        int entries = 10;
        byte[] data = new byte[0x100000];
        for (int i = 0; i < 10; ++i) {
            ledger.addEntry(data);
        }
        Assert.assertEquals((int)ledger.ledgers.size(), (int)10);
        ledgerOffloader = (LedgerOffloader)Mockito.mock(NullLedgerOffloader.class);
        config.setLedgerOffloader(ledgerOffloader);
        ledger.internalTrimConsumedLedgers(Futures.NULL_PROMISE);
        ((LedgerOffloader)Mockito.verify((Object)ledgerOffloader, (VerificationMode)Mockito.times((int)1))).getOffloadPolicies();
    }

    @Test(timeOut=30000L)
    public void testReadOtherManagedLedgersEntry() throws Exception {
        ManagedLedgerImpl managedLedgerA = (ManagedLedgerImpl)this.factory.open("my_test_ledger_a");
        ManagedLedgerImpl managedLedgerB = (ManagedLedgerImpl)this.factory.open("my_test_ledger_b");
        PositionImpl pa = (PositionImpl)managedLedgerA.addEntry("dummy-entry-a".getBytes(Encoding));
        PositionImpl pb = (PositionImpl)managedLedgerB.addEntry("dummy-entry-b".getBytes(Encoding));
        final CompletableFuture completableFutureA = new CompletableFuture();
        managedLedgerA.asyncReadEntry(pa, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                completableFutureA.complete(entry.getData());
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                completableFutureA.completeExceptionally(exception.getCause());
            }
        }, null);
        Assert.assertEquals((byte[])"dummy-entry-a".getBytes(Encoding), (byte[])((byte[])completableFutureA.get()));
        final CompletableFuture completableFutureB = new CompletableFuture();
        managedLedgerA.asyncReadEntry(pb, new AsyncCallbacks.ReadEntryCallback(){

            public void readEntryComplete(Entry entry, Object ctx) {
                completableFutureB.complete(entry.getData());
            }

            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                completableFutureB.completeExceptionally(exception);
            }
        }, null);
        try {
            completableFutureB.get();
            Assert.fail();
        }
        catch (Exception e) {
            Assert.assertEquals((String)e.getCause().getMessage(), (String)"Message not found, the ledgerId does not belong to this topic or has been deleted");
        }
        managedLedgerA.close();
        managedLedgerB.close();
    }

    private abstract class MockedPlacementPolicy
    implements EnsemblePlacementPolicy {
        private MockedPlacementPolicy() {
        }
    }
}

