/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl;

import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.RangeSet;
import com.google.common.collect.Sets;
import com.google.common.collect.TreeRangeSet;
import com.google.common.util.concurrent.RateLimiter;
import com.google.protobuf.InvalidProtocolBufferException;
import java.time.Clock;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.client.api.DigestType;
import org.apache.bookkeeper.common.util.SafeRunnable;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.bookkeeper.mledger.impl.OpFindNewest;
import org.apache.bookkeeper.mledger.impl.OpReadEntry;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedCursorImpl
implements ManagedCursor {
    protected final BookKeeper bookkeeper;
    protected final ManagedLedgerConfig config;
    protected final ManagedLedgerImpl ledger;
    private final String name;
    private final BookKeeper.DigestType digestType;
    protected volatile PositionImpl markDeletePosition;
    protected volatile PositionImpl readPosition;
    private volatile MarkDeleteEntry lastMarkDeleteEntry;
    protected static final AtomicReferenceFieldUpdater<ManagedCursorImpl, OpReadEntry> WAITING_READ_OP_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, OpReadEntry.class, "waitingReadOp");
    private volatile OpReadEntry waitingReadOp = null;
    private static final int FALSE = 0;
    private static final int TRUE = 1;
    private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "resetCursorInProgress");
    private volatile int resetCursorInProgress = 0;
    private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_READ_OPS_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingReadOps");
    private volatile int pendingReadOps = 0;
    protected volatile long messagesConsumedCounter;
    private volatile LedgerHandle cursorLedger;
    private volatile MetaStore.Stat cursorLedgerStat;
    private final RangeSet<PositionImpl> individualDeletedMessages = TreeRangeSet.create();
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private RateLimiter markDeleteLimiter;
    private final ArrayDeque<MarkDeleteEntry> pendingMarkDeleteOps = new ArrayDeque();
    private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
    private volatile int pendingMarkDeletedSubmittedCount = 0;
    private long lastLedgerSwitchTimestamp;
    private final Clock clock;
    private long lastActive;
    private static final AtomicReferenceFieldUpdater<ManagedCursorImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ManagedCursorImpl.class, State.class, "state");
    private volatile State state = null;
    private static final Logger log = LoggerFactory.getLogger(ManagedCursorImpl.class);

    ManagedCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName) {
        this.bookkeeper = bookkeeper;
        this.config = config;
        this.ledger = ledger;
        this.name = cursorName;
        this.digestType = BookKeeper.DigestType.fromApiDigestType((DigestType)config.getDigestType());
        STATE_UPDATER.set(this, State.Uninitialized);
        PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.set(this, 0);
        PENDING_READ_OPS_UPDATER.set(this, 0);
        RESET_CURSOR_IN_PROGRESS_UPDATER.set(this, 0);
        WAITING_READ_OP_UPDATER.set(this, null);
        this.clock = config.getClock();
        this.lastActive = this.clock.millis();
        this.lastLedgerSwitchTimestamp = this.clock.millis();
        this.markDeleteLimiter = config.getThrottleMarkDelete() > 0.0 ? RateLimiter.create((double)config.getThrottleMarkDelete()) : null;
    }

    @Override
    public Map<String, Long> getProperties() {
        return this.lastMarkDeleteEntry.properties;
    }

    void recover(final VoidCallback callback) {
        log.info("[{}] Recovering from bookkeeper ledger cursor: {}", (Object)this.ledger.getName(), (Object)this.name);
        this.ledger.getStore().asyncGetCursorInfo(this.ledger.getName(), this.name, new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>(){

            @Override
            public void operationComplete(MLDataFormats.ManagedCursorInfo info, MetaStore.Stat stat) {
                ManagedCursorImpl.this.cursorLedgerStat = stat;
                ManagedCursorImpl.this.lastActive = info.getLastActive() != 0L ? info.getLastActive() : ManagedCursorImpl.this.lastActive;
                if (info.getCursorsLedgerId() == -1L) {
                    PositionImpl recoveredPosition = new PositionImpl(info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
                    if (info.getIndividualDeletedMessagesCount() > 0) {
                        ManagedCursorImpl.this.recoverIndividualDeletedMessages(info.getIndividualDeletedMessagesList());
                    }
                    HashMap recoveredProperties = Collections.emptyMap();
                    if (info.getPropertiesCount() > 0) {
                        recoveredProperties = Maps.newHashMap();
                        for (int i = 0; i < info.getPropertiesCount(); ++i) {
                            MLDataFormats.LongProperty property = info.getProperties(i);
                            recoveredProperties.put(property.getName(), property.getValue());
                        }
                    }
                    ManagedCursorImpl.this.recoveredCursor(recoveredPosition, recoveredProperties, null);
                    callback.operationComplete();
                } else {
                    log.info("[{}] Consumer {} meta-data recover from ledger {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, info.getCursorsLedgerId()});
                    ManagedCursorImpl.this.recoverFromLedger(info, callback);
                }
            }

            @Override
            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                callback.operationFailed(e);
            }
        });
    }

    protected void recoverFromLedger(MLDataFormats.ManagedCursorInfo info, VoidCallback callback) {
        this.ledger.mbean.startCursorLedgerOpenOp();
        long ledgerId = info.getCursorsLedgerId();
        this.bookkeeper.asyncOpenLedger(ledgerId, this.digestType, this.config.getPassword(), (rc, lh, ctx) -> {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Opened ledger {} for consumer {}. rc={}", new Object[]{this.ledger.getName(), ledgerId, this.name, rc});
            }
            if (ManagedCursorImpl.isBkErrorNotRecoverable(rc)) {
                log.error("[{}] Error opening metadata ledger {} for consumer {}: {}", new Object[]{this.ledger.getName(), ledgerId, this.name, BKException.getMessage((int)rc)});
                this.initialize(this.getRollbackPosition(info), callback);
                return;
            }
            if (rc != 0) {
                log.warn("[{}] Error opening metadata ledger {} for consumer {}: {}", new Object[]{this.ledger.getName(), ledgerId, this.name, BKException.getMessage((int)rc)});
                callback.operationFailed(new ManagedLedgerException(BKException.getMessage((int)rc)));
                return;
            }
            long lastEntryInLedger = lh.getLastAddConfirmed();
            lh.asyncReadEntries(lastEntryInLedger, lastEntryInLedger, (rc1, lh1, seq, ctx1) -> {
                MLDataFormats.PositionInfo positionInfo;
                if (log.isDebugEnabled()) {
                    log.debug("[{}} readComplete rc={} entryId={}", new Object[]{this.ledger.getName(), rc1, lh1.getLastAddConfirmed()});
                }
                if (ManagedCursorImpl.isBkErrorNotRecoverable(rc1)) {
                    log.error("[{}] Error reading from metadata ledger {} for consumer {}: {}", new Object[]{this.ledger.getName(), ledgerId, this.name, BKException.getMessage((int)rc1)});
                    this.initialize(this.getRollbackPosition(info), callback);
                    return;
                }
                if (rc1 != 0) {
                    log.warn("[{}] Error reading from metadata ledger {} for consumer {}: {}", new Object[]{this.ledger.getName(), ledgerId, this.name, BKException.getMessage((int)rc1)});
                    callback.operationFailed(ManagedLedgerImpl.createManagedLedgerException(rc1));
                    return;
                }
                LedgerEntry entry = (LedgerEntry)seq.nextElement();
                try {
                    positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntry());
                }
                catch (InvalidProtocolBufferException e) {
                    callback.operationFailed(new ManagedLedgerException(e));
                    return;
                }
                HashMap recoveredProperties = Collections.emptyMap();
                if (positionInfo.getPropertiesCount() > 0) {
                    recoveredProperties = Maps.newHashMap();
                    for (int i = 0; i < positionInfo.getPropertiesCount(); ++i) {
                        MLDataFormats.LongProperty property = positionInfo.getProperties(i);
                        recoveredProperties.put(property.getName(), property.getValue());
                    }
                }
                PositionImpl position = new PositionImpl(positionInfo);
                if (positionInfo.getIndividualDeletedMessagesCount() > 0) {
                    this.recoverIndividualDeletedMessages(positionInfo.getIndividualDeletedMessagesList());
                }
                this.recoveredCursor(position, recoveredProperties, lh);
                callback.operationComplete();
            }, null);
        }, null);
    }

    private void recoverIndividualDeletedMessages(List<MLDataFormats.MessageRange> individualDeletedMessagesList) {
        this.lock.writeLock().lock();
        try {
            this.individualDeletedMessages.clear();
            individualDeletedMessagesList.forEach(messageRange -> this.individualDeletedMessages.add(Range.openClosed((Comparable)new PositionImpl(messageRange.getLowerEndpoint()), (Comparable)new PositionImpl(messageRange.getUpperEndpoint()))));
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    private void recoveredCursor(PositionImpl position, Map<String, Long> properties, LedgerHandle recoveredFromCursorLedger) {
        if (!this.ledger.ledgerExists(position.getLedgerId())) {
            Long nextExistingLedger = this.ledger.getNextValidLedger(position.getLedgerId());
            if (nextExistingLedger == null) {
                log.info("[{}-{}] Couldn't find next next valid ledger for recovery {}", new Object[]{this.ledger.getName(), this.name, position});
            }
            position = nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, -1L) : position;
        }
        log.info("[{}] Cursor {} recovered to position {}", new Object[]{this.ledger.getName(), this.name, position});
        this.messagesConsumedCounter = -this.getNumberOfEntries((Range<PositionImpl>)Range.openClosed((Comparable)position, (Comparable)this.ledger.getLastPosition()));
        this.markDeletePosition = position;
        this.readPosition = this.ledger.getNextValidPosition(position);
        this.lastMarkDeleteEntry = new MarkDeleteEntry(this.markDeletePosition, properties, null, null);
        this.cursorLedger = recoveredFromCursorLedger;
        STATE_UPDATER.set(this, State.NoLedger);
    }

    void initialize(PositionImpl position, final VoidCallback callback) {
        this.recoveredCursor(position, Collections.emptyMap(), null);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Consumer {} cursor initialized with counters: consumed {} mdPos {} rdPos {}", new Object[]{this.ledger.getName(), this.name, this.messagesConsumedCounter, this.markDeletePosition, this.readPosition});
        }
        this.createNewMetadataLedger(new VoidCallback(){

            @Override
            public void operationComplete() {
                STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
                callback.operationComplete();
            }

            @Override
            public void operationFailed(ManagedLedgerException exception) {
                callback.operationFailed(exception);
            }
        });
    }

    @Override
    public List<Entry> readEntries(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException {
        Preconditions.checkArgument((numberOfEntriesToRead > 0 ? 1 : 0) != 0);
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedgerException exception = null;
            List<Entry> entries = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.asyncReadEntries(numberOfEntriesToRead, new AsyncCallbacks.ReadEntriesCallback(){
            {
            }

            @Override
            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                result.entries = entries;
                counter.countDown();
            }

            @Override
            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        }, null);
        counter.await();
        if (result.exception != null) {
            throw result.exception;
        }
        return result.entries;
    }

    @Override
    public void asyncReadEntries(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        Preconditions.checkArgument((numberOfEntriesToRead > 0 ? 1 : 0) != 0);
        if (STATE_UPDATER.get(this) == State.Closed) {
            callback.readEntriesFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
            return;
        }
        PENDING_READ_OPS_UPDATER.incrementAndGet(this);
        OpReadEntry op = OpReadEntry.create(this, PositionImpl.get(this.readPosition), numberOfEntriesToRead, callback, ctx);
        this.ledger.asyncReadEntries(op);
    }

    @Override
    public Entry getNthEntry(int n, ManagedCursor.IndividualDeletedEntries deletedEntries) throws InterruptedException, ManagedLedgerException {
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedgerException exception = null;
            Entry entry = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.asyncGetNthEntry(n, deletedEntries, new AsyncCallbacks.ReadEntryCallback(){
            {
            }

            @Override
            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }

            @Override
            public void readEntryComplete(Entry entry, Object ctx) {
                result.entry = entry;
                counter.countDown();
            }
        }, null);
        counter.await();
        if (result.exception != null) {
            throw result.exception;
        }
        return result.entry;
    }

    @Override
    public void asyncGetNthEntry(int n, ManagedCursor.IndividualDeletedEntries deletedEntries, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
        PositionImpl endPosition;
        Preconditions.checkArgument((n > 0 ? 1 : 0) != 0);
        if (STATE_UPDATER.get(this) == State.Closed) {
            callback.readEntryFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
            return;
        }
        PositionImpl startPosition = this.ledger.getNextValidPosition(this.markDeletePosition);
        if (startPosition.compareTo(endPosition = this.ledger.getLastPosition()) <= 0) {
            long numOfEntries = this.getNumberOfEntries((Range<PositionImpl>)Range.closed((Comparable)startPosition, (Comparable)endPosition));
            if (numOfEntries >= (long)n) {
                long deletedMessages = 0L;
                if (deletedEntries == ManagedCursor.IndividualDeletedEntries.Exclude) {
                    deletedMessages = this.getNumIndividualDeletedEntriesToSkip(n);
                }
                PositionImpl positionAfterN = this.ledger.getPositionAfterN(this.markDeletePosition, (long)n + deletedMessages, ManagedLedgerImpl.PositionBound.startExcluded);
                this.ledger.asyncReadEntry(positionAfterN, callback, ctx);
            } else {
                callback.readEntryComplete(null, ctx);
            }
        } else {
            callback.readEntryComplete(null, ctx);
        }
    }

    @Override
    public List<Entry> readEntriesOrWait(int numberOfEntriesToRead) throws InterruptedException, ManagedLedgerException {
        Preconditions.checkArgument((numberOfEntriesToRead > 0 ? 1 : 0) != 0);
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedgerException exception = null;
            List<Entry> entries = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.asyncReadEntriesOrWait(numberOfEntriesToRead, new AsyncCallbacks.ReadEntriesCallback(){
            {
            }

            @Override
            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                result.entries = entries;
                counter.countDown();
            }

            @Override
            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        }, null);
        counter.await();
        if (result.exception != null) {
            throw result.exception;
        }
        return result.entries;
    }

    @Override
    public void asyncReadEntriesOrWait(int numberOfEntriesToRead, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        Preconditions.checkArgument((numberOfEntriesToRead > 0 ? 1 : 0) != 0);
        if (STATE_UPDATER.get(this) == State.Closed) {
            callback.readEntriesFailed(new ManagedLedgerException.CursorAlreadyClosedException("Cursor was already closed"), ctx);
            return;
        }
        if (this.hasMoreEntries()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Read entries immediately", (Object)this.ledger.getName(), (Object)this.name);
            }
            this.asyncReadEntries(numberOfEntriesToRead, callback, ctx);
        } else {
            OpReadEntry op = OpReadEntry.create(this, PositionImpl.get(this.readPosition), numberOfEntriesToRead, callback, ctx);
            if (!WAITING_READ_OP_UPDATER.compareAndSet(this, null, op)) {
                callback.readEntriesFailed(new ManagedLedgerException("We can only have a single waiting callback"), ctx);
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Deferring retry of read at position {}", new Object[]{this.ledger.getName(), this.name, op.readPosition});
            }
            this.ledger.getScheduledExecutor().schedule((SafeRunnable)SafeRun.safeRun(() -> {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Re-trying the read at position {}", new Object[]{this.ledger.getName(), this.name, op.readPosition});
                }
                if (!this.hasMoreEntries()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Still no entries available. Register for notification", (Object)this.ledger.getName(), (Object)this.name);
                    }
                    this.ledger.waitingCursors.add(this);
                } else if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Skip notification registering since we do have entries available", (Object)this.ledger.getName(), (Object)this.name);
                }
                if (this.hasMoreEntries()) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Found more entries", (Object)this.ledger.getName(), (Object)this.name);
                    }
                    if (WAITING_READ_OP_UPDATER.compareAndSet(this, op, null)) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] [{}] Cancelled notification and scheduled read at {}", new Object[]{this.ledger.getName(), this.name, op.readPosition});
                        }
                        PENDING_READ_OPS_UPDATER.incrementAndGet(this);
                        this.ledger.asyncReadEntries(op);
                    } else if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] notification was already cancelled", (Object)this.ledger.getName(), (Object)this.name);
                    }
                } else if (this.ledger.isTerminated()) {
                    callback.readEntriesFailed(new ManagedLedgerException.NoMoreEntriesToReadException("Topic was terminated"), ctx);
                }
            }), 10L, TimeUnit.MILLISECONDS);
        }
    }

    @Override
    public boolean cancelPendingReadRequest() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Cancel pending read request", (Object)this.ledger.getName(), (Object)this.name);
        }
        return WAITING_READ_OP_UPDATER.getAndSet(this, null) != null;
    }

    public boolean hasPendingReadRequest() {
        return WAITING_READ_OP_UPDATER.get(this) != null;
    }

    @Override
    public boolean hasMoreEntries() {
        PositionImpl writerPosition = this.ledger.getLastPosition();
        if (writerPosition.getEntryId() != -1L) {
            return this.readPosition.compareTo(writerPosition) <= 0;
        }
        return this.getNumberOfEntries() > 0L;
    }

    @Override
    public long getNumberOfEntries() {
        return this.getNumberOfEntries((Range<PositionImpl>)Range.closedOpen((Comparable)this.readPosition, (Comparable)this.ledger.getLastPosition().getNext()));
    }

    @Override
    public long getNumberOfEntriesSinceFirstNotAckedMessage() {
        return this.ledger.getNumberOfEntries((Range<PositionImpl>)Range.openClosed((Comparable)this.markDeletePosition, (Comparable)this.readPosition));
    }

    @Override
    public int getTotalNonContiguousDeletedMessagesRange() {
        return this.individualDeletedMessages.asRanges().size();
    }

    @Override
    public long getNumberOfEntriesInBacklog() {
        long backlog;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Consumer {} cursor ml-entries: {} -- deleted-counter: {} other counters: mdPos {} rdPos {}", new Object[]{this.ledger.getName(), this.name, ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(this.ledger), this.messagesConsumedCounter, this.markDeletePosition, this.readPosition});
        }
        if ((backlog = ManagedLedgerImpl.ENTRIES_ADDED_COUNTER_UPDATER.get(this.ledger) - this.messagesConsumedCounter) < 0L) {
            backlog = this.getNumberOfEntries((Range<PositionImpl>)Range.closed((Comparable)this.markDeletePosition, (Comparable)this.ledger.getLastPosition()));
        }
        return backlog;
    }

    public long getNumberOfEntriesInStorage() {
        return this.ledger.getNumberOfEntries((Range<PositionImpl>)Range.openClosed((Comparable)this.markDeletePosition, (Comparable)this.ledger.getLastPosition().getNext()));
    }

    @Override
    public Position findNewestMatching(Predicate<Entry> condition) throws InterruptedException, ManagedLedgerException {
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedgerException exception = null;
            Position position = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.asyncFindNewestMatching(ManagedCursor.FindPositionConstraint.SearchActiveEntries, condition, new AsyncCallbacks.FindEntryCallback(){
            {
            }

            @Override
            public void findEntryComplete(Position position, Object ctx) {
                result.position = position;
                counter.countDown();
            }

            @Override
            public void findEntryFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        }, null);
        counter.await();
        if (result.exception != null) {
            throw result.exception;
        }
        return result.position;
    }

    @Override
    public void asyncFindNewestMatching(ManagedCursor.FindPositionConstraint constraint, Predicate<Entry> condition, AsyncCallbacks.FindEntryCallback callback, Object ctx) {
        PositionImpl startPosition = null;
        long max = 0L;
        switch (constraint) {
            case SearchAllAvailableEntries: {
                startPosition = (PositionImpl)this.getFirstPosition();
                max = this.ledger.getNumberOfEntries() - 1L;
                break;
            }
            case SearchActiveEntries: {
                startPosition = this.ledger.getNextValidPosition(this.markDeletePosition);
                max = this.getNumberOfEntriesInStorage();
                break;
            }
            default: {
                callback.findEntryFailed(new ManagedLedgerException("Unknown position constraint"), ctx);
                return;
            }
        }
        if (startPosition == null) {
            callback.findEntryFailed(new ManagedLedgerException("Couldn't find start position"), ctx);
            return;
        }
        OpFindNewest op = new OpFindNewest(this, startPosition, condition, max, callback, ctx);
        op.find();
    }

    @Override
    public void setActive() {
        this.ledger.activateCursor(this);
    }

    @Override
    public boolean isActive() {
        return this.ledger.isCursorActive(this);
    }

    @Override
    public void setInactive() {
        this.ledger.deactivateCursor(this);
    }

    @Override
    public Position getFirstPosition() {
        Long firstLedgerId = (Long)this.ledger.getLedgersInfo().firstKey();
        return firstLedgerId == null ? null : new PositionImpl(firstLedgerId, 0L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void internalResetCursor(PositionImpl position, AsyncCallbacks.ResetCursorCallback resetCursorCallback) {
        if (position.equals(PositionImpl.earliest)) {
            position = this.ledger.getFirstPosition();
        } else if (position.equals(PositionImpl.latest)) {
            position = this.ledger.getLastPosition().getNext();
        }
        log.info("[{}] Initiate reset position to {} on cursor {}", new Object[]{this.ledger.getName(), position, this.name});
        ArrayDeque<MarkDeleteEntry> arrayDeque = this.pendingMarkDeleteOps;
        synchronized (arrayDeque) {
            if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(this, 0, 1)) {
                log.error("[{}] reset requested - position [{}], previous reset in progress - cursor {}", new Object[]{this.ledger.getName(), position, this.name});
                resetCursorCallback.resetFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("reset already in progress"), position);
            }
        }
        final AsyncCallbacks.ResetCursorCallback callback = resetCursorCallback;
        final PositionImpl newPosition = position;
        final VoidCallback finalCallback = new VoidCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete() {
                ManagedCursorImpl.this.lock.writeLock().lock();
                try {
                    PositionImpl newMarkDeletePosition = ManagedCursorImpl.this.ledger.getPreviousPosition(newPosition);
                    ManagedCursorImpl.this.messagesConsumedCounter = ManagedCursorImpl.this.markDeletePosition.compareTo(newMarkDeletePosition) >= 0 ? (ManagedCursorImpl.this.messagesConsumedCounter -= ManagedCursorImpl.this.getNumberOfEntries((Range<PositionImpl>)Range.closedOpen((Comparable)newMarkDeletePosition, (Comparable)ManagedCursorImpl.this.markDeletePosition))) : (ManagedCursorImpl.this.messagesConsumedCounter += ManagedCursorImpl.this.getNumberOfEntries((Range<PositionImpl>)Range.closedOpen((Comparable)ManagedCursorImpl.this.markDeletePosition, (Comparable)newMarkDeletePosition)));
                    ManagedCursorImpl.this.markDeletePosition = newMarkDeletePosition;
                    ManagedCursorImpl.this.lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(), null, null);
                    ManagedCursorImpl.this.individualDeletedMessages.clear();
                    PositionImpl oldReadPosition = ManagedCursorImpl.this.readPosition;
                    if (oldReadPosition.compareTo(newPosition) >= 0) {
                        log.info("[{}] reset position to {} before current read position {} on cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), newPosition, oldReadPosition, ManagedCursorImpl.this.name});
                    } else {
                        log.info("[{}] reset position to {} skipping from current read position {} on cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), newPosition, oldReadPosition, ManagedCursorImpl.this.name});
                    }
                    ManagedCursorImpl.this.readPosition = newPosition;
                }
                finally {
                    ManagedCursorImpl.this.lock.writeLock().unlock();
                }
                ArrayDeque arrayDeque = ManagedCursorImpl.this.pendingMarkDeleteOps;
                synchronized (arrayDeque) {
                    ManagedCursorImpl.this.pendingMarkDeleteOps.clear();
                    if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, 1, 0)) {
                        log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), newPosition, ManagedCursorImpl.this.name});
                    }
                }
                callback.resetComplete(newPosition);
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationFailed(ManagedLedgerException exception) {
                ArrayDeque arrayDeque = ManagedCursorImpl.this.pendingMarkDeleteOps;
                synchronized (arrayDeque) {
                    if (!RESET_CURSOR_IN_PROGRESS_UPDATER.compareAndSet(ManagedCursorImpl.this, 1, 0)) {
                        log.error("[{}] expected reset position [{}], but another reset in progress on cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), newPosition, ManagedCursorImpl.this.name});
                    }
                }
                callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException("unable to persist position for cursor reset " + newPosition.toString()), newPosition);
            }
        };
        this.internalAsyncMarkDelete(newPosition, Collections.emptyMap(), new AsyncCallbacks.MarkDeleteCallback(){

            @Override
            public void markDeleteComplete(Object ctx) {
                finalCallback.operationComplete();
            }

            @Override
            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                finalCallback.operationFailed(exception);
            }
        }, null);
    }

    @Override
    public void asyncResetCursor(Position newPos, AsyncCallbacks.ResetCursorCallback callback) {
        Preconditions.checkArgument((boolean)(newPos instanceof PositionImpl));
        PositionImpl newPosition = (PositionImpl)newPos;
        this.ledger.getExecutor().executeOrdered((Object)this.ledger.getName(), (SafeRunnable)SafeRun.safeRun(() -> {
            if (this.ledger.isValidPosition(newPosition) || newPosition.equals(PositionImpl.earliest) || newPosition.equals(PositionImpl.latest)) {
                this.internalResetCursor(newPosition, callback);
            } else {
                callback.resetFailed(new ManagedLedgerException.InvalidCursorPositionException(newPosition.toString()), newPosition);
            }
        }));
    }

    @Override
    public void resetCursor(Position newPos) throws ManagedLedgerException, InterruptedException {
        class Result {
            ManagedLedgerException exception = null;

            Result() {
            }
        }
        final Result result = new Result();
        final CountDownLatch counter = new CountDownLatch(1);
        this.asyncResetCursor(newPos, new AsyncCallbacks.ResetCursorCallback(){
            {
            }

            @Override
            public void resetComplete(Object ctx) {
                counter.countDown();
            }

            @Override
            public void resetFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        });
        if (!counter.await(30L, TimeUnit.SECONDS)) {
            if (result.exception != null) {
                log.warn("[{}] Reset cursor to {} on cursor {} timed out with exception {}", new Object[]{this.ledger.getName(), newPos, this.name, result.exception});
            }
            throw new ManagedLedgerException("Timeout during reset cursor");
        }
        if (result.exception != null) {
            throw result.exception;
        }
    }

    @Override
    public List<Entry> replayEntries(Set<? extends Position> positions) throws InterruptedException, ManagedLedgerException {
        final CountDownLatch counter = new CountDownLatch(1);
        class Result {
            ManagedLedgerException exception = null;
            List<Entry> entries = null;

            Result() {
            }
        }
        final Result result = new Result();
        this.asyncReplayEntries(positions, new AsyncCallbacks.ReadEntriesCallback(){
            {
            }

            @Override
            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                result.entries = entries;
                counter.countDown();
            }

            @Override
            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        }, null);
        counter.await();
        if (result.exception != null) {
            throw result.exception;
        }
        return result.entries;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions, final AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        final ArrayList entries = Lists.newArrayListWithExpectedSize((int)positions.size());
        if (positions.isEmpty()) {
            callback.readEntriesComplete(entries, ctx);
        }
        HashSet alreadyAcknowledgedPositions = Sets.newHashSet();
        this.lock.readLock().lock();
        try {
            positions.stream().filter(position -> this.individualDeletedMessages.contains((Comparable)((PositionImpl)position)) || ((PositionImpl)position).compareTo(this.markDeletePosition) < 0).forEach(alreadyAcknowledgedPositions::add);
        }
        finally {
            this.lock.readLock().unlock();
        }
        final int totalValidPositions = positions.size() - alreadyAcknowledgedPositions.size();
        final AtomicReference exception = new AtomicReference();
        AsyncCallbacks.ReadEntryCallback cb = new AsyncCallbacks.ReadEntryCallback(){
            int pendingCallbacks;
            {
                this.pendingCallbacks = totalValidPositions;
            }

            @Override
            public synchronized void readEntryComplete(Entry entry, Object ctx) {
                if (exception.get() != null) {
                    entry.release();
                    if (--this.pendingCallbacks == 0) {
                        callback.readEntriesFailed((ManagedLedgerException)exception.get(), ctx);
                    }
                } else {
                    entries.add(entry);
                    if (--this.pendingCallbacks == 0) {
                        callback.readEntriesComplete(entries, ctx);
                    }
                }
            }

            @Override
            public synchronized void readEntryFailed(ManagedLedgerException mle, Object ctx) {
                log.warn("[{}][{}] Error while replaying entries", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, mle});
                if (exception.compareAndSet(null, mle)) {
                    entries.forEach(Entry::release);
                }
                if (--this.pendingCallbacks == 0) {
                    callback.readEntriesFailed((ManagedLedgerException)exception.get(), ctx);
                }
            }
        };
        positions.stream().filter(position -> !alreadyAcknowledgedPositions.contains(position)).forEach(p -> this.ledger.asyncReadEntry((PositionImpl)p, cb, ctx));
        return alreadyAcknowledgedPositions;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long getNumberOfEntries(Range<PositionImpl> range) {
        long allEntries = this.ledger.getNumberOfEntries(range);
        if (log.isDebugEnabled()) {
            log.debug("[{}] getNumberOfEntries. {} allEntries: {}", new Object[]{this.ledger.getName(), range, allEntries});
        }
        long deletedEntries = 0L;
        this.lock.readLock().lock();
        try {
            for (Range r : this.individualDeletedMessages.asRanges()) {
                if (!r.isConnected(range)) continue;
                Range commonEntries = r.intersection(range);
                long commonCount = this.ledger.getNumberOfEntries((Range<PositionImpl>)commonEntries);
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] Discounting {} entries for already deleted range {}", new Object[]{this.ledger.getName(), this.name, commonCount, commonEntries});
                }
                deletedEntries += commonCount;
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Found {} entries - deleted: {}", new Object[]{this.ledger.getName(), allEntries - deletedEntries, deletedEntries});
        }
        return allEntries - deletedEntries;
    }

    @Override
    public void markDelete(Position position) throws InterruptedException, ManagedLedgerException {
        this.markDelete(position, Collections.emptyMap());
    }

    @Override
    public void markDelete(Position position, Map<String, Long> properties) throws InterruptedException, ManagedLedgerException {
        Preconditions.checkNotNull((Object)position);
        Preconditions.checkArgument((boolean)(position instanceof PositionImpl));
        class Result {
            ManagedLedgerException exception = null;

            Result() {
            }
        }
        final Result result = new Result();
        final CountDownLatch counter = new CountDownLatch(1);
        this.asyncMarkDelete(position, properties, new AsyncCallbacks.MarkDeleteCallback(){
            {
            }

            @Override
            public void markDeleteComplete(Object ctx) {
                counter.countDown();
            }

            @Override
            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        }, null);
        if (!counter.await(30L, TimeUnit.SECONDS)) {
            throw new ManagedLedgerException("Timeout during mark-delete operation");
        }
        if (result.exception != null) {
            throw result.exception;
        }
    }

    @Override
    public void clearBacklog() throws InterruptedException, ManagedLedgerException {
        class Result {
            ManagedLedgerException exception = null;

            Result() {
            }
        }
        final Result result = new Result();
        final CountDownLatch counter = new CountDownLatch(1);
        this.asyncClearBacklog(new AsyncCallbacks.ClearBacklogCallback(){
            {
            }

            @Override
            public void clearBacklogComplete(Object ctx) {
                counter.countDown();
            }

            @Override
            public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        }, null);
        if (!counter.await(30L, TimeUnit.SECONDS)) {
            throw new ManagedLedgerException("Timeout during clear backlog operation");
        }
        if (result.exception != null) {
            throw result.exception;
        }
    }

    @Override
    public void asyncClearBacklog(final AsyncCallbacks.ClearBacklogCallback callback, Object ctx) {
        this.asyncMarkDelete(this.ledger.getLastPosition(), new AsyncCallbacks.MarkDeleteCallback(){

            @Override
            public void markDeleteComplete(Object ctx) {
                callback.clearBacklogComplete(ctx);
            }

            @Override
            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                if (exception.getCause() instanceof IllegalArgumentException) {
                    callback.clearBacklogComplete(ctx);
                } else {
                    callback.clearBacklogFailed(exception, ctx);
                }
            }
        }, ctx);
    }

    @Override
    public void skipEntries(int numEntriesToSkip, ManagedCursor.IndividualDeletedEntries deletedEntries) throws InterruptedException, ManagedLedgerException {
        class Result {
            ManagedLedgerException exception = null;

            Result() {
            }
        }
        final Result result = new Result();
        final CountDownLatch counter = new CountDownLatch(1);
        this.asyncSkipEntries(numEntriesToSkip, deletedEntries, new AsyncCallbacks.SkipEntriesCallback(){
            {
            }

            @Override
            public void skipEntriesComplete(Object ctx) {
                counter.countDown();
            }

            @Override
            public void skipEntriesFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                counter.countDown();
            }
        }, null);
        if (!counter.await(30L, TimeUnit.SECONDS)) {
            throw new ManagedLedgerException("Timeout during skip messages operation");
        }
        if (result.exception != null) {
            throw result.exception;
        }
    }

    @Override
    public void asyncSkipEntries(final int numEntriesToSkip, ManagedCursor.IndividualDeletedEntries deletedEntries, final AsyncCallbacks.SkipEntriesCallback callback, Object ctx) {
        log.info("[{}] Skipping {} entries on cursor {}", new Object[]{this.ledger.getName(), numEntriesToSkip, this.name});
        long numDeletedMessages = 0L;
        if (deletedEntries == ManagedCursor.IndividualDeletedEntries.Exclude) {
            numDeletedMessages = this.getNumIndividualDeletedEntriesToSkip(numEntriesToSkip);
        }
        this.asyncMarkDelete(this.ledger.getPositionAfterN(this.markDeletePosition, (long)numEntriesToSkip + numDeletedMessages, ManagedLedgerImpl.PositionBound.startExcluded), new AsyncCallbacks.MarkDeleteCallback(){

            @Override
            public void markDeleteComplete(Object ctx) {
                callback.skipEntriesComplete(ctx);
            }

            @Override
            public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                if (exception.getCause() instanceof IllegalArgumentException) {
                    callback.skipEntriesComplete(ctx);
                } else {
                    log.error("[{}] Skip {} entries failed for cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), numEntriesToSkip, ManagedCursorImpl.this.name, exception});
                    callback.skipEntriesFailed(exception, ctx);
                }
            }
        }, ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    long getNumIndividualDeletedEntriesToSkip(long numEntries) {
        long totalEntriesToSkip = 0L;
        long deletedMessages = 0L;
        this.lock.readLock().lock();
        try {
            PositionImpl startPosition = this.markDeletePosition;
            PositionImpl endPosition = null;
            for (Range r : this.individualDeletedMessages.asRanges()) {
                endPosition = (PositionImpl)r.lowerEndpoint();
                if (startPosition.compareTo(endPosition) <= 0) {
                    Range range = Range.openClosed((Comparable)startPosition, (Comparable)endPosition);
                    long entries = this.ledger.getNumberOfEntries((Range<PositionImpl>)range);
                    if (totalEntriesToSkip + entries >= numEntries) {
                        break;
                    }
                    totalEntriesToSkip += entries;
                    deletedMessages += this.ledger.getNumberOfEntries((Range<PositionImpl>)r);
                    startPosition = (PositionImpl)r.upperEndpoint();
                    continue;
                }
                if (!log.isDebugEnabled()) continue;
                log.debug("[{}] deletePosition {} moved ahead without clearing deleteMsgs {} for cursor {}", new Object[]{this.ledger.getName(), this.markDeletePosition, r.lowerEndpoint(), this.name});
            }
        }
        finally {
            this.lock.readLock().unlock();
        }
        return deletedMessages;
    }

    boolean hasMoreEntries(PositionImpl position) {
        PositionImpl lastPositionInLedger = this.ledger.getLastPosition();
        if (position.compareTo(lastPositionInLedger) <= 0) {
            return this.getNumberOfEntries((Range<PositionImpl>)Range.closed((Comparable)position, (Comparable)lastPositionInLedger)) > 0L;
        }
        return false;
    }

    void initializeCursorPosition(Pair<PositionImpl, Long> lastPositionCounter) {
        this.readPosition = this.ledger.getNextValidPosition((PositionImpl)lastPositionCounter.getLeft());
        this.markDeletePosition = (PositionImpl)lastPositionCounter.getLeft();
        this.messagesConsumedCounter = (Long)lastPositionCounter.getRight();
    }

    PositionImpl setAcknowledgedPosition(PositionImpl newMarkDeletePosition) {
        if (newMarkDeletePosition.compareTo(this.markDeletePosition) < 0) {
            throw new IllegalArgumentException("Mark deleting an already mark-deleted position");
        }
        PositionImpl oldMarkDeletePosition = this.markDeletePosition;
        if (!newMarkDeletePosition.equals(oldMarkDeletePosition)) {
            long skippedEntries = 0L;
            skippedEntries = newMarkDeletePosition.getLedgerId() == oldMarkDeletePosition.getLedgerId() && newMarkDeletePosition.getEntryId() == oldMarkDeletePosition.getEntryId() + 1L ? (this.individualDeletedMessages.contains((Comparable)newMarkDeletePosition) ? 0L : 1L) : this.getNumberOfEntries((Range<PositionImpl>)Range.openClosed((Comparable)oldMarkDeletePosition, (Comparable)newMarkDeletePosition));
            PositionImpl positionAfterNewMarkDelete = this.ledger.getNextValidPosition(newMarkDeletePosition);
            if (this.individualDeletedMessages.contains((Comparable)positionAfterNewMarkDelete)) {
                Range rangeToBeMarkDeleted = this.individualDeletedMessages.rangeContaining((Comparable)positionAfterNewMarkDelete);
                newMarkDeletePosition = (PositionImpl)rangeToBeMarkDeleted.upperEndpoint();
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Moved ack position from: {} to: {} -- skipped: {}", new Object[]{this.ledger.getName(), oldMarkDeletePosition, newMarkDeletePosition, skippedEntries});
            }
            this.messagesConsumedCounter += skippedEntries;
        }
        this.markDeletePosition = PositionImpl.get(newMarkDeletePosition);
        this.individualDeletedMessages.remove(Range.atMost((Comparable)this.markDeletePosition));
        if (this.readPosition.compareTo(newMarkDeletePosition) <= 0) {
            PositionImpl oldReadPosition = this.readPosition;
            this.readPosition = this.ledger.getNextValidPosition(newMarkDeletePosition);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Moved read position from: {} to: {}, and new mark-delete position {}", new Object[]{this.ledger.getName(), oldReadPosition, this.readPosition, this.markDeletePosition});
            }
        }
        return newMarkDeletePosition;
    }

    @Override
    public void asyncMarkDelete(Position position, AsyncCallbacks.MarkDeleteCallback callback, Object ctx) {
        this.asyncMarkDelete(position, Collections.emptyMap(), callback, ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void asyncMarkDelete(Position position, Map<String, Long> properties, AsyncCallbacks.MarkDeleteCallback callback, Object ctx) {
        Preconditions.checkNotNull((Object)position);
        Preconditions.checkArgument((boolean)(position instanceof PositionImpl));
        if (STATE_UPDATER.get(this) == State.Closed) {
            callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
            return;
        }
        if (RESET_CURSOR_IN_PROGRESS_UPDATER.get(this) == 1) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] cursor reset in progress - ignoring mark delete on position [{}] for cursor [{}]", new Object[]{this.ledger.getName(), position, this.name});
            }
            callback.markDeleteFailed(new ManagedLedgerException("Reset cursor in progress - unable to mark delete position " + ((PositionImpl)position).toString()), ctx);
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Mark delete cursor {} up to position: {}", new Object[]{this.ledger.getName(), this.name, position});
        }
        PositionImpl newPosition = (PositionImpl)position;
        if (((PositionImpl)this.ledger.getLastConfirmedEntry()).compareTo(newPosition) < 0) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", new Object[]{this.ledger.getName(), position, this.ledger.getLastConfirmedEntry(), this.name});
            }
            callback.markDeleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
            return;
        }
        this.lock.writeLock().lock();
        try {
            newPosition = this.setAcknowledgedPosition(newPosition);
        }
        catch (IllegalArgumentException e) {
            callback.markDeleteFailed(ManagedLedgerException.getManagedLedgerException(e), ctx);
            return;
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (this.markDeleteLimiter != null && !this.markDeleteLimiter.tryAcquire()) {
            this.lastMarkDeleteEntry = new MarkDeleteEntry(newPosition, properties, null, null);
            callback.markDeleteComplete(ctx);
            return;
        }
        this.internalAsyncMarkDelete(newPosition, properties, callback, ctx);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void internalAsyncMarkDelete(PositionImpl newPosition, Map<String, Long> properties, AsyncCallbacks.MarkDeleteCallback callback, Object ctx) {
        this.ledger.mbean.addMarkDeleteOp();
        MarkDeleteEntry mdEntry = new MarkDeleteEntry(newPosition, properties, callback, ctx);
        ArrayDeque<MarkDeleteEntry> arrayDeque = this.pendingMarkDeleteOps;
        synchronized (arrayDeque) {
            switch (STATE_UPDATER.get(this)) {
                case Closed: {
                    callback.markDeleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
                    return;
                }
                case NoLedger: {
                    this.startCreatingNewMetadataLedger();
                }
                case SwitchingLedger: {
                    this.pendingMarkDeleteOps.add(mdEntry);
                    break;
                }
                case Open: {
                    if (PENDING_READ_OPS_UPDATER.get(this) > 0) {
                        this.pendingMarkDeleteOps.add(mdEntry);
                        break;
                    }
                    this.internalMarkDelete(mdEntry);
                    break;
                }
                default: {
                    log.error("[{}][{}] Invalid cursor state: {}", new Object[]{this.ledger.getName(), this.name, this.state});
                    callback.markDeleteFailed(new ManagedLedgerException("Cursor was in invalid state: " + (Object)((Object)this.state)), ctx);
                }
            }
        }
    }

    void internalMarkDelete(final MarkDeleteEntry mdEntry) {
        PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.incrementAndGet(this);
        this.lastMarkDeleteEntry = mdEntry;
        this.persistPositionToLedger(this.cursorLedger, mdEntry, new VoidCallback(){

            @Override
            public void operationComplete() {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Mark delete cursor {} to position {} succeeded", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, mdEntry.newPosition});
                }
                ManagedCursorImpl.this.lock.writeLock().lock();
                try {
                    ManagedCursorImpl.this.individualDeletedMessages.remove(Range.atMost((Comparable)mdEntry.newPosition));
                }
                finally {
                    ManagedCursorImpl.this.lock.writeLock().unlock();
                }
                ManagedCursorImpl.this.ledger.updateCursor(ManagedCursorImpl.this, mdEntry.newPosition);
                ManagedCursorImpl.this.decrementPendingMarkDeleteCount();
                if (mdEntry.callbackGroup != null) {
                    for (MarkDeleteEntry e : mdEntry.callbackGroup) {
                        e.callback.markDeleteComplete(e.ctx);
                    }
                } else {
                    mdEntry.callback.markDeleteComplete(mdEntry.ctx);
                }
            }

            @Override
            public void operationFailed(ManagedLedgerException exception) {
                log.warn("[{}] Failed to mark delete position for cursor={} position={}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this, mdEntry.newPosition});
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Consumer {} cursor mark delete failed with counters: consumed {} mdPos {} rdPos {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, ManagedCursorImpl.this.messagesConsumedCounter, ManagedCursorImpl.this.markDeletePosition, ManagedCursorImpl.this.readPosition});
                }
                ManagedCursorImpl.this.decrementPendingMarkDeleteCount();
                if (mdEntry.callbackGroup != null) {
                    for (MarkDeleteEntry e : mdEntry.callbackGroup) {
                        e.callback.markDeleteFailed(exception, e.ctx);
                    }
                } else {
                    mdEntry.callback.markDeleteFailed(exception, mdEntry.ctx);
                }
            }
        });
    }

    @Override
    public void delete(Position position) throws InterruptedException, ManagedLedgerException {
        this.delete(Collections.singletonList(position));
    }

    @Override
    public void asyncDelete(Position pos, AsyncCallbacks.DeleteCallback callback, Object ctx) {
        this.asyncDelete(Collections.singletonList(pos), callback, ctx);
    }

    @Override
    public void delete(final Iterable<Position> positions) throws InterruptedException, ManagedLedgerException {
        Preconditions.checkNotNull(positions);
        class Result {
            ManagedLedgerException exception = null;

            Result() {
            }
        }
        final Result result = new Result();
        final CountDownLatch counter = new CountDownLatch(1);
        final AtomicBoolean timeout = new AtomicBoolean(false);
        this.asyncDelete(positions, new AsyncCallbacks.DeleteCallback(){
            {
            }

            @Override
            public void deleteComplete(Object ctx) {
                if (timeout.get()) {
                    log.warn("[{}] [{}] Delete operation timeout. Callback deleteComplete at position {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, positions});
                }
                counter.countDown();
            }

            @Override
            public void deleteFailed(ManagedLedgerException exception, Object ctx) {
                result.exception = exception;
                if (timeout.get()) {
                    log.warn("[{}] [{}] Delete operation timeout. Callback deleteFailed at position {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, positions});
                }
                counter.countDown();
            }
        }, null);
        if (!counter.await(30L, TimeUnit.SECONDS)) {
            timeout.set(true);
            log.warn("[{}] [{}] Delete operation timeout. No callback was triggered at position {}", new Object[]{this.ledger.getName(), this.name, positions});
            throw new ManagedLedgerException("Timeout during delete operation");
        }
        if (result.exception != null) {
            throw result.exception;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void asyncDelete(Iterable<Position> positions, final AsyncCallbacks.DeleteCallback callback, Object ctx) {
        if (this.state == State.Closed) {
            callback.deleteFailed(new ManagedLedgerException("Cursor was already closed"), ctx);
            return;
        }
        PositionImpl newMarkDeletePosition = null;
        this.lock.writeLock().lock();
        try {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Deleting individual messages at {}. Current status: {} - md-position: {}", new Object[]{this.ledger.getName(), this.name, positions, this.individualDeletedMessages, this.markDeletePosition});
            }
            for (Position pos : positions) {
                PositionImpl position = (PositionImpl)Preconditions.checkNotNull((Object)pos);
                if (((PositionImpl)this.ledger.getLastConfirmedEntry()).compareTo(position) < 0) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Failed mark delete due to invalid markDelete {} is ahead of last-confirmed-entry {} for cursor [{}]", new Object[]{this.ledger.getName(), position, this.ledger.getLastConfirmedEntry(), this.name});
                    }
                    callback.deleteFailed(new ManagedLedgerException("Invalid mark deleted position"), ctx);
                    return;
                }
                if (this.individualDeletedMessages.contains((Comparable)position) || position.compareTo(this.markDeletePosition) <= 0) {
                    if (!log.isDebugEnabled()) continue;
                    log.debug("[{}] [{}] Position was already deleted {}", new Object[]{this.ledger.getName(), this.name, position});
                    continue;
                }
                PositionImpl previousPosition = this.ledger.getPreviousPosition(position);
                this.individualDeletedMessages.add(Range.openClosed((Comparable)previousPosition, (Comparable)position));
                ++this.messagesConsumedCounter;
                if (!log.isDebugEnabled()) continue;
                log.debug("[{}] [{}] Individually deleted messages: {}", new Object[]{this.ledger.getName(), this.name, this.individualDeletedMessages});
            }
            if (this.individualDeletedMessages.isEmpty()) {
                callback.deleteComplete(ctx);
                return;
            }
            Range range = (Range)this.individualDeletedMessages.asRanges().iterator().next();
            if (((PositionImpl)range.lowerEndpoint()).compareTo(this.markDeletePosition) <= 0 || this.ledger.getNumberOfEntries((Range<PositionImpl>)Range.openClosed((Comparable)this.markDeletePosition, (Comparable)range.lowerEndpoint())) <= 0L) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found a position range to mark delete for cursor {}: {} ", new Object[]{this.ledger.getName(), this.name, range});
                }
                newMarkDeletePosition = (PositionImpl)range.upperEndpoint();
            }
            newMarkDeletePosition = newMarkDeletePosition != null ? this.setAcknowledgedPosition(newMarkDeletePosition) : this.markDeletePosition;
        }
        catch (Exception e) {
            log.warn("[{}] [{}] Error while updating individualDeletedMessages [{}]", new Object[]{this.ledger.getName(), this.name, e.getMessage(), e});
            callback.deleteFailed(ManagedLedgerException.getManagedLedgerException(e), ctx);
            return;
        }
        finally {
            this.lock.writeLock().unlock();
        }
        if (this.markDeleteLimiter != null && !this.markDeleteLimiter.tryAcquire()) {
            this.lastMarkDeleteEntry = new MarkDeleteEntry(newMarkDeletePosition, Collections.emptyMap(), null, null);
            callback.deleteComplete(ctx);
            return;
        }
        try {
            this.internalAsyncMarkDelete(newMarkDeletePosition, Collections.emptyMap(), new AsyncCallbacks.MarkDeleteCallback(){

                @Override
                public void markDeleteComplete(Object ctx) {
                    callback.deleteComplete(ctx);
                }

                @Override
                public void markDeleteFailed(ManagedLedgerException exception, Object ctx) {
                    callback.deleteFailed(exception, ctx);
                }
            }, ctx);
        }
        catch (Exception e) {
            log.warn("[{}] [{}] Error doing asyncDelete [{}]", new Object[]{this.ledger.getName(), this.name, e.getMessage(), e});
            if (log.isDebugEnabled()) {
                log.debug("[{}] Consumer {} cursor asyncDelete error, counters: consumed {} mdPos {} rdPos {}", new Object[]{this.ledger.getName(), this.name, this.messagesConsumedCounter, this.markDeletePosition, this.readPosition});
            }
            callback.deleteFailed(new ManagedLedgerException(e), ctx);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    List<Entry> filterReadEntries(List<Entry> entries) {
        this.lock.readLock().lock();
        try {
            Range entriesRange = Range.closed((Comparable)((PositionImpl)entries.get(0).getPosition()), (Comparable)((PositionImpl)entries.get(entries.size() - 1).getPosition()));
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Filtering entries {} - alreadyDeleted: {}", new Object[]{this.ledger.getName(), this.name, entriesRange, this.individualDeletedMessages});
            }
            if (this.individualDeletedMessages.isEmpty() || !entriesRange.isConnected(this.individualDeletedMessages.span())) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] [{}] No filtering needed for entries {}", new Object[]{this.ledger.getName(), this.name, entriesRange});
                }
                List<Entry> list = entries;
                return list;
            }
            ArrayList arrayList = Lists.newArrayList((Iterable)Collections2.filter(entries, entry -> {
                boolean includeEntry;
                boolean bl = includeEntry = !this.individualDeletedMessages.contains((Comparable)((PositionImpl)entry.getPosition()));
                if (!includeEntry) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] [{}] Filtering entry at {} - already deleted", new Object[]{this.ledger.getName(), this.name, entry.getPosition()});
                    }
                    entry.release();
                }
                return includeEntry;
            }));
            return arrayList;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public synchronized String toString() {
        return MoreObjects.toStringHelper((Object)this).add("ledger", (Object)this.ledger.getName()).add("name", (Object)this.name).add("ackPos", (Object)this.markDeletePosition).add("readPos", (Object)this.readPosition).toString();
    }

    @Override
    public String getName() {
        return this.name;
    }

    @Override
    public long getLastActive() {
        return this.lastActive;
    }

    @Override
    public void updateLastActive() {
        this.lastActive = System.currentTimeMillis();
    }

    @Override
    public boolean isDurable() {
        return true;
    }

    @Override
    public Position getReadPosition() {
        return PositionImpl.get(this.readPosition);
    }

    @Override
    public Position getMarkDeletedPosition() {
        return PositionImpl.get(this.markDeletePosition);
    }

    @Override
    public void rewind() {
        this.lock.writeLock().lock();
        try {
            PositionImpl newReadPosition = this.ledger.getNextValidPosition(this.markDeletePosition);
            PositionImpl oldReadPosition = this.readPosition;
            log.info("[{}-{}] Rewind from {} to {}", new Object[]{this.ledger.getName(), this.name, oldReadPosition, newReadPosition});
            this.readPosition = newReadPosition;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void seek(Position newReadPositionInt) {
        Preconditions.checkArgument((boolean)(newReadPositionInt instanceof PositionImpl));
        PositionImpl newReadPosition = (PositionImpl)newReadPositionInt;
        this.lock.writeLock().lock();
        try {
            if (newReadPosition.compareTo(this.markDeletePosition) <= 0) {
                newReadPosition = this.ledger.getNextValidPosition(this.markDeletePosition);
            }
            PositionImpl oldReadPosition = this.readPosition;
            this.readPosition = newReadPosition;
        }
        finally {
            this.lock.writeLock().unlock();
        }
    }

    @Override
    public void close() throws InterruptedException, ManagedLedgerException {
        class Result {
            ManagedLedgerException exception = null;

            Result() {
            }
        }
        final Result result = new Result();
        final CountDownLatch latch = new CountDownLatch(1);
        this.asyncClose(new AsyncCallbacks.CloseCallback(){
            {
            }

            @Override
            public void closeComplete(Object ctx) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Successfully closed ledger for cursor {}", (Object)ManagedCursorImpl.this.ledger.getName(), (Object)ManagedCursorImpl.this.name);
                }
                latch.countDown();
            }

            @Override
            public void closeFailed(ManagedLedgerException exception, Object ctx) {
                log.warn("[{}] Closing ledger failed for cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, exception});
                result.exception = exception;
                latch.countDown();
            }
        }, null);
        if (!latch.await(30L, TimeUnit.SECONDS)) {
            throw new ManagedLedgerException("Timeout during close operation");
        }
        if (result.exception != null) {
            throw result.exception;
        }
    }

    private void persistPosition(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties, final AsyncCallbacks.CloseCallback callback, final Object ctx) {
        if (this.shouldPersistUnackRangesToLedger()) {
            this.persistPositionToLedger(this.cursorLedger, new MarkDeleteEntry(position, properties, null, null), new VoidCallback(){

                @Override
                public void operationComplete() {
                    log.info("[{}][{}] Updated md-position={} into cursor-ledger {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, ManagedCursorImpl.this.markDeletePosition, ManagedCursorImpl.this.cursorLedger.getId()});
                    callback.closeComplete(ctx);
                }

                @Override
                public void operationFailed(ManagedLedgerException e) {
                    log.warn("[{}][{}] Failed to persist mark-delete position into cursor-ledger{}: {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, ManagedCursorImpl.this.cursorLedger.getId(), e.getMessage()});
                    callback.closeFailed(e, ctx);
                }
            });
        } else {
            this.persistPositionMetaStore(cursorsLedgerId, position, properties, new MetaStore.MetaStoreCallback<Void>(){

                @Override
                public void operationComplete(Void result, MetaStore.Stat stat) {
                    log.info("[{}][{}] Closed cursor at md-position={}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, ManagedCursorImpl.this.markDeletePosition});
                    callback.closeComplete(ctx);
                    ManagedCursorImpl.this.asyncDeleteLedger(ManagedCursorImpl.this.cursorLedger);
                }

                @Override
                public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                    log.warn("[{}][{}] Failed to update cursor info when closing: {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, e.getMessage()});
                    callback.closeFailed(e, ctx);
                }
            }, true);
        }
    }

    private boolean shouldPersistUnackRangesToLedger() {
        return this.cursorLedger != null && this.config.getMaxUnackedRangesToPersist() > 0 && this.individualDeletedMessages.asRanges().size() > this.config.getMaxUnackedRangesToPersistInZk();
    }

    private void persistPositionMetaStore(long cursorsLedgerId, PositionImpl position, Map<String, Long> properties, final MetaStore.MetaStoreCallback<Void> callback, boolean persistIndividualDeletedMessageRanges) {
        MLDataFormats.ManagedCursorInfo.Builder info = MLDataFormats.ManagedCursorInfo.newBuilder().setCursorsLedgerId(cursorsLedgerId).setMarkDeleteLedgerId(position.getLedgerId()).setMarkDeleteEntryId(position.getEntryId()).setLastActive(this.lastActive);
        info.addAllProperties(this.buildPropertiesMap(properties));
        if (persistIndividualDeletedMessageRanges) {
            info.addAllIndividualDeletedMessages(this.buildIndividualDeletedMessageRanges());
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}]  Closing cursor at md-position: {}", new Object[]{this.ledger.getName(), this.name, position});
        }
        this.ledger.getStore().asyncUpdateCursorInfo(this.ledger.getName(), this.name, info.build(), this.cursorLedgerStat, new MetaStore.MetaStoreCallback<Void>(){

            @Override
            public void operationComplete(Void result, MetaStore.Stat stat) {
                ManagedCursorImpl.this.cursorLedgerStat = stat;
                callback.operationComplete(result, stat);
            }

            @Override
            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                callback.operationFailed(e);
            }
        });
    }

    @Override
    public void asyncClose(AsyncCallbacks.CloseCallback callback, Object ctx) {
        State oldState = STATE_UPDATER.getAndSet(this, State.Closed);
        if (oldState == State.Closed) {
            log.info("[{}] [{}] State is already closed", (Object)this.ledger.getName(), (Object)this.name);
            callback.closeComplete(ctx);
            return;
        }
        this.persistPosition(-1L, this.lastMarkDeleteEntry.newPosition, this.lastMarkDeleteEntry.properties, callback, ctx);
    }

    void setReadPosition(Position newReadPositionInt) {
        Preconditions.checkArgument((boolean)(newReadPositionInt instanceof PositionImpl));
        this.readPosition = (PositionImpl)newReadPositionInt;
    }

    void startCreatingNewMetadataLedger() {
        State oldState = STATE_UPDATER.getAndSet(this, State.SwitchingLedger);
        if (oldState == State.SwitchingLedger) {
            return;
        }
        if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) == 0) {
            this.createNewMetadataLedger();
        }
    }

    void createNewMetadataLedger() {
        this.createNewMetadataLedger(new VoidCallback(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete() {
                ArrayDeque arrayDeque = ManagedCursorImpl.this.pendingMarkDeleteOps;
                synchronized (arrayDeque) {
                    ManagedCursorImpl.this.flushPendingMarkDeletes();
                    STATE_UPDATER.set(ManagedCursorImpl.this, State.Open);
                }
            }

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationFailed(ManagedLedgerException exception) {
                log.error("[{}][{}] Metadata ledger creation failed", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, exception});
                ArrayDeque arrayDeque = ManagedCursorImpl.this.pendingMarkDeleteOps;
                synchronized (arrayDeque) {
                    while (!ManagedCursorImpl.this.pendingMarkDeleteOps.isEmpty()) {
                        MarkDeleteEntry entry = (MarkDeleteEntry)ManagedCursorImpl.this.pendingMarkDeleteOps.poll();
                        entry.callback.markDeleteFailed(exception, entry.ctx);
                    }
                    STATE_UPDATER.set(ManagedCursorImpl.this, State.NoLedger);
                }
            }
        });
    }

    private void flushPendingMarkDeletes() {
        if (!this.pendingMarkDeleteOps.isEmpty()) {
            this.internalFlushPendingMarkDeletes();
        }
    }

    void internalFlushPendingMarkDeletes() {
        MarkDeleteEntry lastEntry = this.pendingMarkDeleteOps.getLast();
        lastEntry.callbackGroup = Lists.newArrayList(this.pendingMarkDeleteOps);
        this.pendingMarkDeleteOps.clear();
        this.internalMarkDelete(lastEntry);
    }

    void createNewMetadataLedger(final VoidCallback callback) {
        this.ledger.mbean.startCursorLedgerCreateOp();
        this.bookkeeper.asyncCreateLedger(this.config.getMetadataEnsemblesize(), this.config.getMetadataWriteQuorumSize(), this.config.getMetadataAckQuorumSize(), this.digestType, this.config.getPassword(), (rc, lh, ctx) -> this.ledger.getExecutor().execute((Runnable)SafeRun.safeRun(() -> {
            this.ledger.mbean.endCursorLedgerCreateOp();
            if (rc != 0) {
                log.warn("[{}] Error creating ledger for cursor {}: {}", new Object[]{this.ledger.getName(), this.name, BKException.getMessage((int)rc)});
                callback.operationFailed(new ManagedLedgerException(BKException.getMessage((int)rc)));
                return;
            }
            if (log.isDebugEnabled()) {
                log.debug("[{}] Created ledger {} for cursor {}", new Object[]{this.ledger.getName(), lh.getId(), this.name});
            }
            final MarkDeleteEntry mdEntry = this.lastMarkDeleteEntry;
            this.persistPositionToLedger(lh, mdEntry, new VoidCallback(){

                @Override
                public void operationComplete() {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Persisted position {} for cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), mdEntry.newPosition, ManagedCursorImpl.this.name});
                    }
                    ManagedCursorImpl.this.switchToNewLedger(lh, new VoidCallback(){

                        @Override
                        public void operationComplete() {
                            callback.operationComplete();
                        }

                        @Override
                        public void operationFailed(ManagedLedgerException exception) {
                            ManagedCursorImpl.this.bookkeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
                                if (rc != 0) {
                                    log.warn("[{}] Failed to delete orphan ledger {}", (Object)ManagedCursorImpl.this.ledger.getName(), (Object)lh.getId());
                                }
                            }, null);
                            callback.operationFailed(exception);
                        }
                    });
                }

                @Override
                public void operationFailed(ManagedLedgerException exception) {
                    log.warn("[{}] Failed to persist position {} for cursor {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), mdEntry.newPosition, ManagedCursorImpl.this.name});
                    ManagedCursorImpl.this.ledger.mbean.startCursorLedgerDeleteOp();
                    ManagedCursorImpl.this.bookkeeper.asyncDeleteLedger(lh.getId(), new AsyncCallback.DeleteCallback(){

                        public void deleteComplete(int rc, Object ctx) {
                            ManagedCursorImpl.this.ledger.mbean.endCursorLedgerDeleteOp();
                        }
                    }, null);
                    callback.operationFailed(exception);
                }
            });
        })), null, Collections.emptyMap());
    }

    private List<MLDataFormats.LongProperty> buildPropertiesMap(Map<String, Long> properties) {
        if (properties.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList longProperties = Lists.newArrayList();
        properties.forEach((name, value) -> {
            MLDataFormats.LongProperty lp = MLDataFormats.LongProperty.newBuilder().setName((String)name).setValue((long)value).build();
            longProperties.add(lp);
        });
        return longProperties;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private List<MLDataFormats.MessageRange> buildIndividualDeletedMessageRanges() {
        this.lock.readLock().lock();
        try {
            if (this.individualDeletedMessages.isEmpty()) {
                List<MLDataFormats.MessageRange> list = Collections.emptyList();
                return list;
            }
            MLDataFormats.NestedPositionInfo.Builder nestedPositionBuilder = MLDataFormats.NestedPositionInfo.newBuilder();
            MLDataFormats.MessageRange.Builder messageRangeBuilder = MLDataFormats.MessageRange.newBuilder();
            List<MLDataFormats.MessageRange> list = this.individualDeletedMessages.asRanges().stream().limit(this.config.getMaxUnackedRangesToPersist()).map(positionRange -> {
                PositionImpl p = (PositionImpl)positionRange.lowerEndpoint();
                nestedPositionBuilder.setLedgerId(p.getLedgerId());
                nestedPositionBuilder.setEntryId(p.getEntryId());
                messageRangeBuilder.setLowerEndpoint(nestedPositionBuilder.build());
                p = (PositionImpl)positionRange.upperEndpoint();
                nestedPositionBuilder.setLedgerId(p.getLedgerId());
                nestedPositionBuilder.setEntryId(p.getEntryId());
                messageRangeBuilder.setUpperEndpoint(nestedPositionBuilder.build());
                return messageRangeBuilder.build();
            }).collect(Collectors.toList());
            return list;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    void persistPositionToLedger(LedgerHandle lh, MarkDeleteEntry mdEntry, final VoidCallback callback) {
        final PositionImpl position = mdEntry.newPosition;
        MLDataFormats.PositionInfo pi = MLDataFormats.PositionInfo.newBuilder().setLedgerId(position.getLedgerId()).setEntryId(position.getEntryId()).addAllIndividualDeletedMessages(this.buildIndividualDeletedMessageRanges()).addAllProperties(this.buildPropertiesMap(mdEntry.properties)).build();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Cursor {} Appending to ledger={} position={}", new Object[]{this.ledger.getName(), this.name, lh.getId(), position});
        }
        Preconditions.checkNotNull((Object)lh);
        lh.asyncAddEntry(pi.toByteArray(), (rc, lh1, entryId, ctx) -> {
            if (rc == 0) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Updated cursor {} position {} in meta-ledger {}", new Object[]{this.ledger.getName(), this.name, position, lh1.getId()});
                }
                if (this.shouldCloseLedger(lh1)) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Need to create new metadata ledger for consumer {}", (Object)this.ledger.getName(), (Object)this.name);
                    }
                    this.startCreatingNewMetadataLedger();
                }
                callback.operationComplete();
            } else {
                log.warn("[{}] Error updating cursor {} position {} in meta-ledger {}: {}", new Object[]{this.ledger.getName(), this.name, position, lh1.getId(), BKException.getMessage((int)rc)});
                STATE_UPDATER.compareAndSet(this, State.Open, State.NoLedger);
                this.persistPositionMetaStore(-1L, position, mdEntry.properties, new MetaStore.MetaStoreCallback<Void>(){

                    @Override
                    public void operationComplete(Void result, MetaStore.Stat stat) {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}][{}] Updated cursor in meta store after previous failure in ledger at position {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, position});
                        }
                        callback.operationComplete();
                    }

                    @Override
                    public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                        log.warn("[{}][{}] Failed to update cursor in meta store after previous failure in ledger: {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, e.getMessage()});
                        callback.operationFailed(ManagedLedgerImpl.createManagedLedgerException(rc));
                    }
                }, true);
            }
        }, null);
    }

    boolean shouldCloseLedger(LedgerHandle lh) {
        long now = this.clock.millis();
        if ((lh.getLastAddConfirmed() >= (long)this.config.getMetadataMaxEntriesPerLedger() || this.lastLedgerSwitchTimestamp < now - (long)(this.config.getLedgerRolloverTimeout() * 1000)) && STATE_UPDATER.get(this) != State.Closed) {
            this.lastLedgerSwitchTimestamp = now;
            return true;
        }
        return false;
    }

    void switchToNewLedger(final LedgerHandle lh, final VoidCallback callback) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Switching cursor {} to ledger {}", new Object[]{this.ledger.getName(), this.name, lh.getId()});
        }
        this.persistPositionMetaStore(lh.getId(), this.lastMarkDeleteEntry.newPosition, this.lastMarkDeleteEntry.properties, new MetaStore.MetaStoreCallback<Void>(){

            @Override
            public void operationComplete(Void result, MetaStore.Stat stat) {
                log.info("[{}] Updated cursor {} with ledger id {} md-position={} rd-position={}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, lh.getId(), ManagedCursorImpl.this.markDeletePosition, ManagedCursorImpl.this.readPosition});
                LedgerHandle oldLedger = ManagedCursorImpl.this.cursorLedger;
                ManagedCursorImpl.this.cursorLedger = lh;
                ManagedCursorImpl.this.cursorLedgerStat = stat;
                callback.operationComplete();
                ManagedCursorImpl.this.asyncDeleteLedger(oldLedger);
            }

            @Override
            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                log.warn("[{}] Failed to update consumer {}", new Object[]{ManagedCursorImpl.this.ledger.getName(), ManagedCursorImpl.this.name, e});
                callback.operationFailed(e);
            }
        }, false);
    }

    void notifyEntriesAvailable() {
        OpReadEntry opReadEntry;
        if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Received ml notification", (Object)this.ledger.getName(), (Object)this.name);
        }
        if ((opReadEntry = (OpReadEntry)WAITING_READ_OP_UPDATER.getAndSet(this, null)) != null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] [{}] Received notification of new messages persisted, reading at {} -- last: {}", new Object[]{this.ledger.getName(), this.name, opReadEntry.readPosition, this.ledger.lastConfirmedEntry});
                log.debug("[{}] Consumer {} cursor notification: other counters: consumed {} mdPos {} rdPos {}", new Object[]{this.ledger.getName(), this.name, this.messagesConsumedCounter, this.markDeletePosition, this.readPosition});
            }
            PENDING_READ_OPS_UPDATER.incrementAndGet(this);
            opReadEntry.readPosition = (PositionImpl)this.getReadPosition();
            this.ledger.asyncReadEntries(opReadEntry);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] [{}] Received notification but had no pending read operation", (Object)this.ledger.getName(), (Object)this.name);
        }
    }

    void asyncCloseCursorLedger(final AsyncCallbacks.CloseCallback callback, Object ctx) {
        LedgerHandle lh = this.cursorLedger;
        this.ledger.mbean.startCursorLedgerCloseOp();
        log.info("[{}] [{}] Closing metadata ledger {}", new Object[]{this.ledger.getName(), this.name, lh.getId()});
        lh.asyncClose(new AsyncCallback.CloseCallback(){

            public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
                ManagedCursorImpl.this.ledger.mbean.endCursorLedgerCloseOp();
                if (rc == 0) {
                    callback.closeComplete(ctx);
                } else {
                    callback.closeFailed(ManagedLedgerImpl.createManagedLedgerException(rc), ctx);
                }
            }
        }, ctx);
    }

    void decrementPendingMarkDeleteCount() {
        State state;
        if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.decrementAndGet(this) == 0 && (state = STATE_UPDATER.get(this)) == State.SwitchingLedger) {
            this.createNewMetadataLedger();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void readOperationCompleted() {
        if (PENDING_READ_OPS_UPDATER.decrementAndGet(this) == 0) {
            ArrayDeque<MarkDeleteEntry> arrayDeque = this.pendingMarkDeleteOps;
            synchronized (arrayDeque) {
                if (STATE_UPDATER.get(this) == State.Open) {
                    this.flushPendingMarkDeletes();
                } else if (PENDING_MARK_DELETED_SUBMITTED_COUNT_UPDATER.get(this) != 0) {
                    log.info("[{}] read operation completed and cursor was closed. need to call any queued cursor close", (Object)this.name);
                }
            }
        }
    }

    void asyncDeleteLedger(LedgerHandle lh) {
        this.asyncDeleteLedger(lh, 3);
    }

    private void asyncDeleteLedger(LedgerHandle lh, int retry) {
        if (lh == null || retry <= 0) {
            if (lh != null) {
                log.warn("[{}-{}] Failed to delete ledger after retries {}", new Object[]{this.ledger.getName(), this.name, lh.getId()});
            }
            return;
        }
        this.ledger.mbean.startCursorLedgerDeleteOp();
        this.bookkeeper.asyncDeleteLedger(lh.getId(), (rc, ctx) -> {
            this.ledger.mbean.endCursorLedgerDeleteOp();
            if (rc != 0) {
                log.warn("[{}] Failed to delete ledger {}: {}", new Object[]{this.ledger.getName(), lh.getId(), BKException.getMessage((int)rc)});
                if (rc != -7) {
                    this.ledger.getScheduledExecutor().schedule((SafeRunnable)SafeRun.safeRun(() -> this.asyncDeleteLedger(lh, retry - 1)), 60L, TimeUnit.SECONDS);
                }
                return;
            }
            log.info("[{}][{}] Successfully closed & deleted ledger {} in cursor", new Object[]{this.ledger.getName(), this.name, lh.getId()});
        }, null);
    }

    void asyncDeleteCursorLedger() {
        this.asyncDeleteCursorLedger(3);
    }

    private void asyncDeleteCursorLedger(int retry) {
        STATE_UPDATER.set(this, State.Closed);
        if (this.cursorLedger == null || retry <= 0) {
            if (this.cursorLedger != null) {
                log.warn("[{}-{}] Failed to delete ledger after retries {}", new Object[]{this.ledger.getName(), this.name, this.cursorLedger.getId()});
            }
            return;
        }
        this.ledger.mbean.startCursorLedgerDeleteOp();
        this.bookkeeper.asyncDeleteLedger(this.cursorLedger.getId(), (rc, ctx) -> {
            this.ledger.mbean.endCursorLedgerDeleteOp();
            if (rc == 0) {
                log.info("[{}][{}] Deleted cursor ledger {}", new Object[]{this.ledger.getName(), this.name, this.cursorLedger.getId()});
            } else {
                log.warn("[{}][{}] Failed to delete ledger {}: {}", new Object[]{this.ledger.getName(), this.name, this.cursorLedger.getId(), BKException.getMessage((int)rc)});
                if (rc != -7) {
                    this.ledger.getScheduledExecutor().schedule((SafeRunnable)SafeRun.safeRun(() -> this.asyncDeleteCursorLedger(retry - 1)), 60L, TimeUnit.SECONDS);
                }
            }
        }, null);
    }

    private static boolean isBkErrorNotRecoverable(int rc) {
        switch (rc) {
            case -13: 
            case -10: 
            case -7: 
            case -1: {
                return true;
            }
        }
        return false;
    }

    private PositionImpl getRollbackPosition(MLDataFormats.ManagedCursorInfo info) {
        PositionImpl firstPosition = this.ledger.getFirstPosition();
        PositionImpl snapshottedPosition = new PositionImpl(info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
        if (firstPosition == null) {
            return snapshottedPosition;
        }
        if (snapshottedPosition.compareTo(firstPosition) < 0) {
            return firstPosition;
        }
        return snapshottedPosition;
    }

    public int getPendingReadOpsCount() {
        return PENDING_READ_OPS_UPDATER.get(this);
    }

    public long getMessagesConsumedCounter() {
        return this.messagesConsumedCounter;
    }

    public long getCursorLedger() {
        LedgerHandle lh = this.cursorLedger;
        return lh != null ? lh.getId() : -1L;
    }

    public long getCursorLedgerLastEntry() {
        LedgerHandle lh = this.cursorLedger;
        return lh != null ? lh.getLastAddConfirmed() : -1L;
    }

    public String getIndividuallyDeletedMessages() {
        this.lock.readLock().lock();
        try {
            String string = this.individualDeletedMessages.toString();
            return string;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public PositionImpl getNextAvailablePosition(PositionImpl position) {
        Range range = this.individualDeletedMessages.rangeContaining((Comparable)position);
        if (range != null) {
            PositionImpl nextPosition = ((PositionImpl)range.upperEndpoint()).getNext();
            return nextPosition != null && nextPosition.compareTo(position) > 0 ? nextPosition : position.getNext();
        }
        return position.getNext();
    }

    public Position getNextLedgerPosition(long currentLedgerId) {
        Long nextExistingLedger = this.ledger.getNextValidLedger(currentLedgerId);
        return nextExistingLedger != null ? PositionImpl.get(nextExistingLedger, 0L) : null;
    }

    public boolean isIndividuallyDeletedEntriesEmpty() {
        this.lock.readLock().lock();
        try {
            boolean bl = this.individualDeletedMessages.isEmpty();
            return bl;
        }
        finally {
            this.lock.readLock().unlock();
        }
    }

    public long getLastLedgerSwitchTimestamp() {
        return this.lastLedgerSwitchTimestamp;
    }

    public String getState() {
        return STATE_UPDATER.get(this).toString();
    }

    @Override
    public double getThrottleMarkDelete() {
        return this.markDeleteLimiter.getRate();
    }

    @Override
    public void setThrottleMarkDelete(double throttleMarkDelete) {
        if (throttleMarkDelete > 0.0) {
            if (this.markDeleteLimiter == null) {
                this.markDeleteLimiter = RateLimiter.create((double)throttleMarkDelete);
            } else {
                this.markDeleteLimiter.setRate(throttleMarkDelete);
            }
        } else {
            this.markDeleteLimiter = null;
        }
    }

    public static interface VoidCallback {
        public void operationComplete();

        public void operationFailed(ManagedLedgerException var1);
    }

    static enum State {
        Uninitialized,
        NoLedger,
        Open,
        SwitchingLedger,
        Closed;

    }

    class MarkDeleteEntry {
        final PositionImpl newPosition;
        final AsyncCallbacks.MarkDeleteCallback callback;
        final Object ctx;
        final Map<String, Long> properties;
        List<MarkDeleteEntry> callbackGroup;

        public MarkDeleteEntry(PositionImpl newPosition, Map<String, Long> properties, AsyncCallbacks.MarkDeleteCallback callback, Object ctx) {
            this.newPosition = PositionImpl.get(newPosition);
            this.properties = properties;
            this.callback = callback;
            this.ctx = ctx;
        }
    }
}

