/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.collect.ImmutableSet;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class OffloadPrefixTest
extends MockedBookKeeperTestCase {
    private static final Logger log = LoggerFactory.getLogger(OffloadPrefixTest.class);

    @Test
    public void testNullOffloader() throws Exception {
        String content;
        int i;
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (i = 0; i < 25; ++i) {
            String content2 = "entry-" + i;
            ledger.addEntry(content2.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        Position p = ledger.getLastConfirmedEntry();
        while (i < 45) {
            content = "entry-" + i;
            ledger.addEntry(content.getBytes());
            ++i;
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)5);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
        try {
            ledger.offloadPrefix(p);
            Assert.fail((String)"Should have thrown an exception");
        }
        catch (ManagedLedgerException e2) {
            Assert.assertEquals(e2.getCause().getClass(), CompletionException.class);
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)5);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
        while (i < 55) {
            content = "entry-" + i;
            ledger.addEntry(content.getBytes());
            ++i;
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)6);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
    }

    @Test
    public void testOffload() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 25; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
    }

    @Test
    public void testPositionOutOfRange() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 25; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        try {
            ledger.offloadPrefix((Position)PositionImpl.EARLIEST);
            Assert.fail((String)"Should have thrown an exception");
        }
        catch (ManagedLedgerException.InvalidCursorPositionException invalidCursorPositionException) {
            // empty catch block
        }
        try {
            ledger.offloadPrefix((Position)PositionImpl.LATEST);
            Assert.fail((String)"Should have thrown an exception");
        }
        catch (ManagedLedgerException.InvalidCursorPositionException invalidCursorPositionException) {
            // empty catch block
        }
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)0);
    }

    @Test
    public void testPositionOnEdgeOfLedger() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 20; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Position p = ledger.getLastConfirmedEntry();
        ledger.addEntry("entry-blah".getBytes());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix(p);
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)1);
        Assert.assertTrue((boolean)offloader.offloadedLedgers().contains(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId()));
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)1L);
        Assert.assertEquals((long)firstUnoffloaded.getLedgerId(), (long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId());
        Assert.assertEquals((long)firstUnoffloaded.getEntryId(), (long)0L);
        PositionImpl firstUnoffloaded2 = (PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)2);
        Assert.assertTrue((boolean)offloader.offloadedLedgers().contains(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId()));
        Assert.assertTrue((boolean)offloader.offloadedLedgers().contains(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId()));
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getOffloadContext().getComplete());
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)2L);
        Assert.assertEquals((long)firstUnoffloaded2.getLedgerId(), (long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(2)).getLedgerId());
    }

    @Test
    public void testPositionOnLastEmptyLedger() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 5; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        ledger.close();
        ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertTrue((((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getSize() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getSize(), (long)0L);
        PositionImpl p = new PositionImpl(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId(), 0L);
        PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix((Position)p);
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)1);
        Assert.assertTrue((boolean)offloader.offloadedLedgers().contains(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId()));
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)1L);
        Assert.assertEquals((long)firstUnoffloaded.getLedgerId(), (long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId());
        Assert.assertEquals((long)firstUnoffloaded.getEntryId(), (long)0L);
    }

    @Test
    public void testTrimOccursDuringOffload() throws Exception {
        final CountDownLatch offloadStarted = new CountDownLatch(1);
        final CompletableFuture<Object> blocker = new CompletableFuture<Object>();
        MockLedgerOffloader offloader = new MockLedgerOffloader(){

            @Override
            public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
                offloadStarted.countDown();
                return blocker.thenCompose(f -> super.offload(ledger, uuid, extraMetadata));
            }
        };
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(0, TimeUnit.MINUTES);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("foobar");
        for (int i = 0; i < 21; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        PositionImpl startOfSecondLedger = PositionImpl.get((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId(), (long)0L);
        PositionImpl startOfThirdLedger = PositionImpl.get((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(2)).getLedgerId(), (long)0L);
        OffloadCallbackPromise cbPromise = new OffloadCallbackPromise();
        ledger.asyncOffloadPrefix((Position)startOfThirdLedger, (AsyncCallbacks.OffloadCallback)cbPromise, null);
        offloadStarted.await();
        cursor.markDelete((Position)startOfSecondLedger, new HashMap());
        OffloadPrefixTest.assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 2);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
        blocker.complete(null);
        cbPromise.get();
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)1L);
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)1);
        Assert.assertTrue((boolean)offloader.offloadedLedgers().contains(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId()));
    }

    @Test
    public void testTrimOccursDuringOffloadLedgerDeletedBeforeOffload() throws Exception {
        final CountDownLatch offloadStarted = new CountDownLatch(1);
        final CompletableFuture<Long> blocker = new CompletableFuture<Long>();
        MockLedgerOffloader offloader = new MockLedgerOffloader(){

            @Override
            public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
                offloadStarted.countDown();
                return blocker.thenCompose(trimmedLedger -> {
                    if (trimmedLedger.longValue() == ledger.getId()) {
                        CompletableFuture future = new CompletableFuture();
                        future.completeExceptionally((Throwable)new BKException.BKNoSuchLedgerExistsException());
                        return future;
                    }
                    return super.offload(ledger, uuid, extraMetadata);
                });
            }
        };
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(0, TimeUnit.MINUTES);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("foobar");
        for (int i = 0; i < 21; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        PositionImpl startOfSecondLedger = PositionImpl.get((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId(), (long)0L);
        PositionImpl startOfThirdLedger = PositionImpl.get((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(2)).getLedgerId(), (long)0L);
        OffloadCallbackPromise cbPromise = new OffloadCallbackPromise();
        ledger.asyncOffloadPrefix((Position)startOfThirdLedger, (AsyncCallbacks.OffloadCallback)cbPromise, null);
        offloadStarted.await();
        long trimmedLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        cursor.markDelete((Position)startOfSecondLedger, new HashMap());
        OffloadPrefixTest.assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 2);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getLedgerId() == trimmedLedger).count(), (long)0L);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
        blocker.complete(trimmedLedger);
        cbPromise.get();
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)1L);
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)1);
        Assert.assertTrue((boolean)offloader.offloadedLedgers().contains(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId()));
    }

    @Test
    public void testOffloadClosedManagedLedger() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 21; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Position p = ledger.getLastConfirmedEntry();
        ledger.close();
        try {
            ledger.offloadPrefix(p);
            Assert.fail((String)"Should fail because ML is closed");
        }
        catch (ManagedLedgerException.ManagedLedgerAlreadyClosedException managedLedgerAlreadyClosedException) {
            // empty catch block
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)0);
    }

    @Test
    public void testOffloadSamePositionTwice() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 25; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
    }

    public void offloadThreeOneFails(int failIndex) throws Exception {
        CompletableFuture<Set<Long>> promise = new CompletableFuture<Set<Long>>();
        ErroringMockLedgerOffloader offloader = new ErroringMockLedgerOffloader(promise);
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 35; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)4);
        promise.complete((Set<Long>)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(failIndex)).getLedgerId()));
        try {
            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        }
        catch (ManagedLedgerException e2) {
            Assert.assertEquals(e2.getCause().getClass(), CompletionException.class);
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)4);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)2L);
        Assert.assertFalse((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(failIndex)).getOffloadContext().getComplete());
    }

    @Test
    public void testOffloadThreeFirstFails() throws Exception {
        this.offloadThreeOneFails(0);
    }

    @Test
    public void testOffloadThreeSecondFails() throws Exception {
        this.offloadThreeOneFails(1);
    }

    @Test
    public void testOffloadThreeThirdFails() throws Exception {
        this.offloadThreeOneFails(2);
    }

    @Test
    public void testOffloadNewML() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        try {
            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        }
        catch (ManagedLedgerException.InvalidCursorPositionException invalidCursorPositionException) {
            // empty catch block
        }
        ledger.addEntry("foobar".getBytes());
        Position p = ledger.getLastConfirmedEntry();
        Assert.assertEquals((Object)p, (Object)ledger.offloadPrefix(ledger.getLastConfirmedEntry()));
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)1);
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)0);
    }

    @Test
    public void testOffloadConflict() throws Exception {
        final ConcurrentHashMap.KeySetView deleted = ConcurrentHashMap.newKeySet();
        final CompletableFuture errorLedgers = new CompletableFuture();
        final ConcurrentHashMap.KeySetView failedOffloads = ConcurrentHashMap.newKeySet();
        MockLedgerOffloader offloader = new MockLedgerOffloader(){

            @Override
            public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
                return errorLedgers.thenCompose(errors -> {
                    if (errors.remove(ledger.getId())) {
                        failedOffloads.add(Pair.of((Object)ledger.getId(), (Object)uuid));
                        CompletableFuture future = new CompletableFuture();
                        future.completeExceptionally(new Exception("Some kind of error"));
                        return future;
                    }
                    return super.offload(ledger, uuid, extraMetadata);
                });
            }

            @Override
            public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
                deleted.add(Pair.of((Object)ledgerId, (Object)uuid));
                return super.deleteOffloaded(ledgerId, uuid, offloadDriverMetadata);
            }
        };
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 15; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        ConcurrentHashMap.KeySetView errorSet = ConcurrentHashMap.newKeySet();
        errorSet.add(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId());
        errorLedgers.complete(errorSet);
        try {
            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        }
        catch (ManagedLedgerException content) {
            // empty catch block
        }
        Assert.assertTrue((boolean)errorSet.isEmpty());
        Assert.assertEquals((int)failedOffloads.size(), (int)1);
        Assert.assertEquals((int)deleted.size(), (int)0);
        long expectedFailedLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        UUID expectedFailedUUID = new UUID(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getUidMsb(), ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getUidLsb());
        Assert.assertEquals(failedOffloads.stream().findFirst().get(), (Object)Pair.of((Object)expectedFailedLedger, (Object)expectedFailedUUID));
        Assert.assertFalse((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)failedOffloads.size(), (int)1);
        Assert.assertEquals((int)deleted.size(), (int)1);
        Assert.assertEquals(deleted.stream().findFirst().get(), (Object)Pair.of((Object)expectedFailedLedger, (Object)expectedFailedUUID));
        UUID successUUID = new UUID(((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getUidMsb(), ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getUidLsb());
        Assert.assertNotEquals((Object)expectedFailedUUID, (Object)successUUID);
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
    }

    @Test
    public void testOffloadDelete() throws Exception {
        ConcurrentHashMap.KeySetView deleted = ConcurrentHashMap.newKeySet();
        CompletableFuture errorLedgers = new CompletableFuture();
        ConcurrentHashMap.KeySetView failedOffloads = ConcurrentHashMap.newKeySet();
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(0, TimeUnit.MINUTES);
        offloader.getOffloadPolicies().setManagedLedgerOffloadDeletionLagInMillis(Long.valueOf(100L));
        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(Long.valueOf(100L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("foobar");
        for (int i = 0; i < 15; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)1L);
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().getComplete());
        long firstLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        long secondLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId();
        cursor.markDelete(ledger.getLastConfirmedEntry());
        OffloadPrefixTest.assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1);
        Assert.assertEquals((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId(), (long)secondLedger);
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger));
    }

    @Test
    public void testOffloadDeleteIncomplete() throws Exception {
        ConcurrentHashMap.KeySetView deleted = ConcurrentHashMap.newKeySet();
        CompletableFuture errorLedgers = new CompletableFuture();
        ConcurrentHashMap.KeySetView failedOffloads = ConcurrentHashMap.newKeySet();
        MockLedgerOffloader offloader = new MockLedgerOffloader(){

            @Override
            public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
                return super.offload(ledger, uuid, extraMetadata).thenCompose(res -> {
                    CompletableFuture f = new CompletableFuture();
                    f.completeExceptionally(new Exception("Fail after offload occurred"));
                    return f;
                });
            }
        };
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(0, TimeUnit.MINUTES);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        ManagedCursor cursor = ledger.openCursor("foobar");
        for (int i = 0; i < 15; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        try {
            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        }
        catch (ManagedLedgerException i) {
            // empty catch block
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)2);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).count(), (long)0L);
        Assert.assertEquals((long)ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().hasUidMsb()).count(), (long)1L);
        Assert.assertTrue((boolean)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getOffloadContext().hasUidMsb());
        long firstLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        long secondLedger = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId();
        cursor.markDelete(ledger.getLastConfirmedEntry());
        OffloadPrefixTest.assertEventuallyTrue(() -> ledger.getLedgersInfoAsList().size() == 1);
        Assert.assertEquals((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId(), (long)secondLedger);
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.deletedOffloads().contains(firstLedger));
    }

    @Test
    public void testDontOffloadEmpty() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 35; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)4);
        long firstLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId();
        long secondLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId();
        long thirdLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(2)).getLedgerId();
        long fourthLedgerId = ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(3)).getLedgerId();
        Field ledgersField = ledger.getClass().getDeclaredField("ledgers");
        ledgersField.setAccessible(true);
        Map ledgers = (Map)ledgersField.get(ledger);
        ledgers.put(secondLedgerId, ((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledgers.get(secondLedgerId)).toBuilder().setEntries(0L).setSize(0L).build());
        PositionImpl firstUnoffloaded = (PositionImpl)ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        Assert.assertEquals((long)firstUnoffloaded.getLedgerId(), (long)fourthLedgerId);
        Assert.assertEquals((long)firstUnoffloaded.getEntryId(), (long)0L);
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)4);
        Assert.assertEquals(ledger.getLedgersInfoAsList().stream().filter(e -> e.getOffloadContext().getComplete()).map(e -> e.getLedgerId()).collect(Collectors.toSet()), offloader.offloadedLedgers());
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)firstLedgerId, (Object)thirdLedgerId));
    }

    private static byte[] buildEntry(int size, String pattern) {
        byte[] entry = new byte[size];
        byte[] patternBytes = pattern.getBytes();
        for (int i = 0; i < entry.length; ++i) {
            entry[i] = patternBytes[i % patternBytes.length];
        }
        return entry;
    }

    @Test
    public void testAutoTriggerOffload() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(Long.valueOf(100L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 25; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId()));
    }

    @Test
    public void manualTriggerWhileAutoInProgress() throws Exception {
        int i;
        final CompletableFuture<Object> slowOffload = new CompletableFuture<Object>();
        final CountDownLatch offloadRunning = new CountDownLatch(1);
        MockLedgerOffloader offloader = new MockLedgerOffloader(){

            @Override
            public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
                offloadRunning.countDown();
                return slowOffload.thenCompose(res -> super.offload(ledger, uuid, extraMetadata));
            }
        };
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(Long.valueOf(100L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (i = 0; i < 25; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        offloadRunning.await();
        for (i = 0; i < 20; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        Position p = ledger.addEntry(OffloadPrefixTest.buildEntry(10, "last-entry"));
        try {
            ledger.offloadPrefix(p);
            Assert.fail((String)"Shouldn't have succeeded");
        }
        catch (ManagedLedgerException.OffloadInProgressException offloadInProgressException) {
            // empty catch block
        }
        slowOffload.complete(null);
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(2)).getLedgerId()));
        ledger.offloadPrefix(p);
        Assert.assertEquals((int)offloader.offloadedLedgers().size(), (int)4);
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(2)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(3)).getLedgerId()));
    }

    @Test
    public void autoTriggerWhileManualInProgress() throws Exception {
        final CompletableFuture<Object> slowOffload = new CompletableFuture<Object>();
        final CountDownLatch offloadRunning = new CountDownLatch(1);
        MockLedgerOffloader offloader = new MockLedgerOffloader(){

            @Override
            public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
                offloadRunning.countDown();
                return slowOffload.thenCompose(res -> super.offload(ledger, uuid, extraMetadata));
            }
        };
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(Long.valueOf(100L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 14; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        Position p = ledger.addEntry(OffloadPrefixTest.buildEntry(10, "trigger-entry"));
        OffloadCallbackPromise cbPromise = new OffloadCallbackPromise();
        ledger.asyncOffloadPrefix(p, (AsyncCallbacks.OffloadCallback)cbPromise, null);
        offloadRunning.await();
        for (int i = 0; i < 20; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        slowOffload.complete(null);
        Assert.assertEquals(cbPromise.join(), (Object)PositionImpl.get((long)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId(), (long)0L));
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId()));
    }

    @Test
    public void multipleAutoTriggers() throws Exception {
        int i;
        final CompletableFuture<Object> slowOffload = new CompletableFuture<Object>();
        final CountDownLatch offloadRunning = new CountDownLatch(1);
        MockLedgerOffloader offloader = new MockLedgerOffloader(){

            @Override
            public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
                offloadRunning.countDown();
                return slowOffload.thenCompose(res -> super.offload(ledger, uuid, extraMetadata));
            }
        };
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(Long.valueOf(100L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (i = 0; i < 25; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        offloadRunning.await();
        for (i = 0; i < 20; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        slowOffload.complete(null);
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 3);
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(2)).getLedgerId()));
    }

    @Test
    public void offloadAsSoonAsClosed() throws Exception {
        int i;
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        offloader.getOffloadPolicies().setManagedLedgerOffloadThresholdInBytes(Long.valueOf(0L));
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (i = 0; i < 11; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 1);
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId()));
        for (i = 0; i < 10; ++i) {
            ledger.addEntry(OffloadPrefixTest.buildEntry(10, "entry-" + i));
        }
        OffloadPrefixTest.assertEventuallyTrue(() -> offloader.offloadedLedgers().size() == 2);
        Assert.assertEquals(offloader.offloadedLedgers(), (Set)ImmutableSet.of((Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0)).getLedgerId(), (Object)((MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(1)).getLedgerId()));
    }

    static void assertEventuallyTrue(BooleanSupplier predicate) throws Exception {
        for (int i = 0; i < 30 && !predicate.getAsBoolean(); ++i) {
            Thread.sleep(100L);
        }
        Assert.assertTrue((boolean)predicate.getAsBoolean());
    }

    @Test
    public void testFailByZk() throws Exception {
        MockLedgerOffloader offloader = new MockLedgerOffloader();
        ManagedLedgerConfig config = new ManagedLedgerConfig();
        config.setMaxEntriesPerLedger(10);
        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
        config.setRetentionTime(10, TimeUnit.MINUTES);
        config.setRetentionSizeInMB(10L);
        config.setLedgerOffloader((LedgerOffloader)offloader);
        ManagedLedgerImpl ledger = (ManagedLedgerImpl)this.factory.open("my_test_ledger", config);
        for (int i = 0; i < 25; ++i) {
            String content = "entry-" + i;
            ledger.addEntry(content.getBytes());
        }
        Assert.assertEquals((int)ledger.getLedgersInfoAsList().size(), (int)3);
        offloader.inject = () -> {
            try {
                this.stopMetadataStore();
            }
            catch (Exception e) {
                e.printStackTrace();
            }
        };
        try {
            ledger.offloadPrefix(ledger.getLastConfirmedEntry());
        }
        catch (Exception content) {
            // empty catch block
        }
        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledger.getLedgersInfoAsList().get(0);
        MLDataFormats.OffloadContext offloadContext = ledgerInfo.getOffloadContext();
        Assert.assertEquals((boolean)offloadContext.getComplete(), (boolean)false);
    }

    static class ErroringMockLedgerOffloader
    extends MockLedgerOffloader {
        CompletableFuture<Set<Long>> errorLedgers = new CompletableFuture();

        ErroringMockLedgerOffloader(CompletableFuture<Set<Long>> errorLedgers) {
            this.errorLedgers = errorLedgers;
        }

        @Override
        public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
            return this.errorLedgers.thenCompose(errors -> {
                if (errors.contains(ledger.getId())) {
                    CompletableFuture future = new CompletableFuture();
                    future.completeExceptionally(new Exception("Some kind of error"));
                    return future;
                }
                return super.offload(ledger, uuid, extraMetadata);
            });
        }
    }

    static class MockLedgerOffloader
    implements LedgerOffloader {
        ConcurrentHashMap<Long, UUID> offloads = new ConcurrentHashMap();
        ConcurrentHashMap<Long, UUID> deletes = new ConcurrentHashMap();
        InjectAfterOffload inject = null;
        OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create((String)"S3", (String)"", (String)"", (String)"", null, null, null, null, (Integer)0x4000000, (Integer)0x100000, (Long)OffloadPoliciesImpl.DEFAULT_OFFLOAD_THRESHOLD_IN_BYTES, (Long)OffloadPoliciesImpl.DEFAULT_OFFLOAD_DELETION_LAG_IN_MILLIS, (OffloadedReadPriority)OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY);

        MockLedgerOffloader() {
        }

        Set<Long> offloadedLedgers() {
            return this.offloads.keySet();
        }

        Set<Long> deletedOffloads() {
            return this.deletes.keySet();
        }

        public String getOffloadDriverName() {
            return "mock";
        }

        public CompletableFuture<Void> offload(ReadHandle ledger, UUID uuid, Map<String, String> extraMetadata) {
            CompletableFuture<Void> promise = new CompletableFuture<Void>();
            if (this.offloads.putIfAbsent(ledger.getId(), uuid) == null) {
                promise.complete(null);
            } else {
                promise.completeExceptionally(new Exception("Already exists exception"));
            }
            if (this.inject != null) {
                this.inject.call();
            }
            return promise;
        }

        public CompletableFuture<ReadHandle> readOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
            CompletableFuture<ReadHandle> promise = new CompletableFuture<ReadHandle>();
            promise.completeExceptionally(new UnsupportedOperationException());
            return promise;
        }

        public CompletableFuture<Void> deleteOffloaded(long ledgerId, UUID uuid, Map<String, String> offloadDriverMetadata) {
            CompletableFuture<Void> promise = new CompletableFuture<Void>();
            if (this.offloads.remove(ledgerId, uuid)) {
                this.deletes.put(ledgerId, uuid);
                promise.complete(null);
            } else {
                promise.completeExceptionally(new Exception("Not found"));
            }
            return promise;
        }

        public OffloadPoliciesImpl getOffloadPolicies() {
            return this.offloadPolicies;
        }

        public void close() {
        }

        static interface InjectAfterOffload {
            public void call();
        }
    }

    static class OffloadCallbackPromise
    extends CompletableFuture<Position>
    implements AsyncCallbacks.OffloadCallback {
        OffloadCallbackPromise() {
        }

        public void offloadComplete(Position pos, Object ctx) {
            this.complete(pos);
        }

        public void offloadFailed(ManagedLedgerException exception, Object ctx) {
            this.completeExceptionally(exception);
        }
    }
}

