/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl;

import java.util.Enumeration;
import java.util.List;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.shade.com.google.common.collect.BoundType;
import org.apache.pulsar.shade.com.google.common.collect.Range;
import org.apache.pulsar.shade.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.AsyncCallback;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BKException;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeper;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.BookKeeperAdmin;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerEntry;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.LedgerHandle;
import org.apache.pulsar.shade.org.apache.bookkeeper.client.api.DigestType;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.MetaStore;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.util.Errors;
import org.apache.pulsar.shade.org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ManagedLedgerOfflineBacklog {
    private final byte[] password;
    private final BookKeeper.DigestType digestType;
    private static final int META_READ_TIMEOUT_SECONDS = 60;
    private boolean accurate = false;
    private String brokerName;
    private static final Logger log = LoggerFactory.getLogger(ManagedLedgerOfflineBacklog.class);

    public ManagedLedgerOfflineBacklog(DigestType digestType, byte[] password, String brokerName, boolean accurate) {
        this.digestType = BookKeeper.DigestType.fromApiDigestType(digestType);
        this.password = password;
        this.accurate = accurate;
        this.brokerName = brokerName;
    }

    private long getNumberOfEntries(Range<PositionImpl> range, NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) {
        boolean toIncluded;
        PositionImpl fromPosition = range.lowerEndpoint();
        boolean fromIncluded = range.lowerBoundType() == BoundType.CLOSED;
        PositionImpl toPosition = range.upperEndpoint();
        boolean bl = toIncluded = range.upperBoundType() == BoundType.CLOSED;
        if (fromPosition.getLedgerId() == toPosition.getLedgerId()) {
            long count = toPosition.getEntryId() - fromPosition.getEntryId() - 1L;
            count += fromIncluded ? 1L : 0L;
            return count += toIncluded ? 1L : 0L;
        }
        long count = 0L;
        count += toPosition.getEntryId();
        count += toIncluded ? 1L : 0L;
        MLDataFormats.ManagedLedgerInfo.LedgerInfo li = (MLDataFormats.ManagedLedgerInfo.LedgerInfo)ledgers.get(fromPosition.getLedgerId());
        if (li != null) {
            count += li.getEntries() - (fromPosition.getEntryId() + 1L);
            count += fromIncluded ? 1L : 0L;
        }
        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.subMap(fromPosition.getLedgerId(), false, toPosition.getLedgerId(), false).values()) {
            count += ls.getEntries();
        }
        return count;
    }

    public PersistentOfflineTopicStats getEstimatedUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory, String managedLedgerName) throws Exception {
        return this.estimateUnloadedTopicBacklog(factory, TopicName.get("persistent://" + managedLedgerName));
    }

    public PersistentOfflineTopicStats estimateUnloadedTopicBacklog(ManagedLedgerFactoryImpl factory, TopicName topicName) throws Exception {
        String managedLedgerName = topicName.getPersistenceNamingEncoding();
        long numberOfEntries = 0L;
        long totalSize = 0L;
        ConcurrentSkipListMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers = new ConcurrentSkipListMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo>();
        PersistentOfflineTopicStats offlineTopicStats = new PersistentOfflineTopicStats(managedLedgerName, this.brokerName);
        this.readLedgerMeta(factory, topicName, ledgers);
        for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : ledgers.values()) {
            numberOfEntries += ls.getEntries();
            totalSize += ls.getSize();
            if (!this.accurate) continue;
            offlineTopicStats.addLedgerDetails(ls.getEntries(), ls.getTimestamp(), ls.getSize(), ls.getLedgerId());
        }
        offlineTopicStats.totalMessages = numberOfEntries;
        offlineTopicStats.storageSize = totalSize;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Total number of entries - {} and size - {}", new Object[]{managedLedgerName, numberOfEntries, totalSize});
        }
        this.calculateCursorBacklogs(factory, topicName, ledgers, offlineTopicStats);
        offlineTopicStats.statGeneratedAt.setTime(System.currentTimeMillis());
        return offlineTopicStats;
    }

    private void readLedgerMeta(ManagedLedgerFactoryImpl factory, TopicName topicName, final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers) throws Exception {
        final String managedLedgerName = topicName.getPersistenceNamingEncoding();
        MetaStore store = factory.getMetaStore();
        final BookKeeper bk = factory.getBookKeeper();
        final CountDownLatch mlMetaCounter = new CountDownLatch(1);
        store.getManagedLedgerInfo(managedLedgerName, false, new MetaStore.MetaStoreCallback<MLDataFormats.ManagedLedgerInfo>(){

            @Override
            public void operationComplete(MLDataFormats.ManagedLedgerInfo mlInfo, Stat stat) {
                for (MLDataFormats.ManagedLedgerInfo.LedgerInfo ls : mlInfo.getLedgerInfoList()) {
                    ledgers.put(ls.getLedgerId(), ls);
                }
                if (ledgers.size() > 0) {
                    long id = (Long)ledgers.lastKey();
                    AsyncCallback.OpenCallback opencb = (rc, lh, ctx1) -> {
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Opened ledger {}: {}", new Object[]{managedLedgerName, id, BKException.getMessage(rc)});
                        }
                        if (rc == 0) {
                            MLDataFormats.ManagedLedgerInfo.LedgerInfo info = MLDataFormats.ManagedLedgerInfo.LedgerInfo.newBuilder().setLedgerId(id).setEntries(lh.getLastAddConfirmed() + 1L).setSize(lh.getLength()).setTimestamp(System.currentTimeMillis()).build();
                            ledgers.put(id, info);
                            mlMetaCounter.countDown();
                        } else if (Errors.isNoSuchLedgerExistsException(rc)) {
                            log.warn("[{}] Ledger not found: {}", (Object)managedLedgerName, ledgers.lastKey());
                            ledgers.remove(ledgers.lastKey());
                            mlMetaCounter.countDown();
                        } else {
                            log.error("[{}] Failed to open ledger {}: {}", new Object[]{managedLedgerName, id, BKException.getMessage(rc)});
                            mlMetaCounter.countDown();
                        }
                    };
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Opening ledger {}", (Object)managedLedgerName, (Object)id);
                    }
                    try {
                        bk.asyncOpenLedgerNoRecovery(id, ManagedLedgerOfflineBacklog.this.digestType, ManagedLedgerOfflineBacklog.this.password, opencb, null);
                    }
                    catch (Exception e) {
                        log.warn("[{}] Failed to open ledger {}: {}", new Object[]{managedLedgerName, id, e});
                        mlMetaCounter.countDown();
                    }
                } else {
                    log.warn("[{}] Ledger list empty", (Object)managedLedgerName);
                    mlMetaCounter.countDown();
                }
            }

            @Override
            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                log.warn("[{}] Unable to obtain managed ledger metadata - {}", (Object)managedLedgerName, (Object)e);
                mlMetaCounter.countDown();
            }
        });
        if (this.accurate) {
            mlMetaCounter.await();
        } else {
            mlMetaCounter.await(60L, TimeUnit.SECONDS);
        }
    }

    private void calculateCursorBacklogs(ManagedLedgerFactoryImpl factory, TopicName topicName, final NavigableMap<Long, MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgers, final PersistentOfflineTopicStats offlineTopicStats) throws Exception {
        if (ledgers.size() == 0) {
            return;
        }
        final String managedLedgerName = topicName.getPersistenceNamingEncoding();
        final MetaStore store = factory.getMetaStore();
        final BookKeeper bk = factory.getBookKeeper();
        final CountDownLatch allCursorsCounter = new CountDownLatch(1);
        long errorInReadingCursor = -1L;
        final ConcurrentOpenHashMap<String, Long> ledgerRetryMap = new ConcurrentOpenHashMap<String, Long>();
        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = ledgers.lastEntry().getValue();
        final PositionImpl lastLedgerPosition = new PositionImpl(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1L);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Last ledger position {}", (Object)managedLedgerName, (Object)lastLedgerPosition);
        }
        store.getCursors(managedLedgerName, new MetaStore.MetaStoreCallback<List<String>>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void operationComplete(List<String> cursors, Stat v) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Found {} cursors", (Object)managedLedgerName, (Object)cursors.size());
                }
                if (cursors.isEmpty()) {
                    allCursorsCounter.countDown();
                    return;
                }
                final CountDownLatch cursorCounter = new CountDownLatch(cursors.size());
                for (final String cursorName : cursors) {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Loading cursor {}", (Object)managedLedgerName, (Object)cursorName);
                    }
                    final AsyncCallback.OpenCallback cursorLedgerOpenCb = (rc, lh, ctx1) -> {
                        final long ledgerId = lh.getId();
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Opened cursor ledger {} for cursor {}. rc={}", new Object[]{managedLedgerName, ledgerId, cursorName, rc});
                        }
                        if (rc != 0) {
                            log.warn("[{}] Error opening metadata ledger {} for cursor {}: {}", new Object[]{managedLedgerName, ledgerId, cursorName, BKException.getMessage(rc)});
                            cursorCounter.countDown();
                            return;
                        }
                        long lac = lh.getLastAddConfirmed();
                        if (log.isDebugEnabled()) {
                            log.debug("[{}] Cursor {} LAC {} read from ledger {}", new Object[]{managedLedgerName, cursorName, lac, ledgerId});
                        }
                        if (lac == -1L) {
                            ledgerRetryMap.put(cursorName, ledgerId);
                            log.info("[{}] Cursor {} LAC {} read from ledger {}", new Object[]{managedLedgerName, cursorName, lac, ledgerId});
                            cursorCounter.countDown();
                            return;
                        }
                        final long entryId = lac;
                        lh.asyncReadEntries(entryId, entryId, new AsyncCallback.ReadCallback(){

                            /*
                             * WARNING - Removed try catching itself - possible behaviour change.
                             */
                            @Override
                            public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq, Object ctx) {
                                block10: {
                                    try {
                                        MLDataFormats.PositionInfo positionInfo;
                                        if (log.isDebugEnabled()) {
                                            log.debug("readComplete rc={} entryId={}", (Object)rc, (Object)entryId);
                                        }
                                        if (rc != 0) {
                                            log.warn("[{}] Error reading from metadata ledger {} for cursor {}: {}", new Object[]{managedLedgerName, ledgerId, cursorName, BKException.getMessage(rc)});
                                            offlineTopicStats.addCursorDetails(cursorName, -1L, lh.getId());
                                            break block10;
                                        }
                                        LedgerEntry entry = seq.nextElement();
                                        try {
                                            positionInfo = MLDataFormats.PositionInfo.parseFrom(entry.getEntry());
                                        }
                                        catch (InvalidProtocolBufferException e) {
                                            log.warn("[{}] Error reading position from metadata ledger {} for cursor {}: {}", new Object[]{managedLedgerName, ledgerId, cursorName, e});
                                            offlineTopicStats.addCursorDetails(cursorName, -1L, lh.getId());
                                            cursorCounter.countDown();
                                            return;
                                        }
                                        PositionImpl lastAckedMessagePosition = new PositionImpl(positionInfo);
                                        if (log.isDebugEnabled()) {
                                            log.debug("[{}] Cursor {} MD {} read last ledger position {}", new Object[]{managedLedgerName, cursorName, lastAckedMessagePosition, lastLedgerPosition});
                                        }
                                        Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition, lastLedgerPosition);
                                        if (log.isDebugEnabled()) {
                                            log.debug("[{}] Calculating backlog for cursor {} using range {}", new Object[]{managedLedgerName, cursorName, range});
                                        }
                                        long cursorBacklog = ManagedLedgerOfflineBacklog.this.getNumberOfEntries(range, ledgers);
                                        offlineTopicStats.messageBacklog += cursorBacklog;
                                        offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, lh.getId());
                                    }
                                    finally {
                                        cursorCounter.countDown();
                                    }
                                }
                            }
                        }, null);
                    };
                    store.asyncGetCursorInfo(managedLedgerName, cursorName, new MetaStore.MetaStoreCallback<MLDataFormats.ManagedCursorInfo>(){

                        @Override
                        public void operationComplete(MLDataFormats.ManagedCursorInfo info, Stat stat) {
                            long cursorLedgerId = info.getCursorsLedgerId();
                            if (log.isDebugEnabled()) {
                                log.debug("[{}] Cursor {} meta-data read ledger id {}", new Object[]{managedLedgerName, cursorName, cursorLedgerId});
                            }
                            if (cursorLedgerId != -1L) {
                                bk.asyncOpenLedgerNoRecovery(cursorLedgerId, ManagedLedgerOfflineBacklog.this.digestType, ManagedLedgerOfflineBacklog.this.password, cursorLedgerOpenCb, null);
                            } else {
                                PositionImpl lastAckedMessagePosition = new PositionImpl(info.getMarkDeleteLedgerId(), info.getMarkDeleteEntryId());
                                Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition, lastLedgerPosition);
                                if (log.isDebugEnabled()) {
                                    log.debug("[{}] Calculating backlog for cursor {} using range {}", new Object[]{managedLedgerName, cursorName, range});
                                }
                                long cursorBacklog = ManagedLedgerOfflineBacklog.this.getNumberOfEntries(range, ledgers);
                                offlineTopicStats.messageBacklog += cursorBacklog;
                                offlineTopicStats.addCursorDetails(cursorName, cursorBacklog, cursorLedgerId);
                                cursorCounter.countDown();
                            }
                        }

                        @Override
                        public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                            log.warn("[{}] Unable to obtain cursor ledger for cursor {}: {}", new Object[]{managedLedgerName, cursorName, e});
                            cursorCounter.countDown();
                        }
                    });
                }
                try {
                    if (ManagedLedgerOfflineBacklog.this.accurate) {
                        cursorCounter.await();
                    } else {
                        cursorCounter.await(60L, TimeUnit.SECONDS);
                    }
                }
                catch (Exception e) {
                    log.warn("[{}] Error reading subscription positions{}", (Object)managedLedgerName, (Object)e);
                }
                finally {
                    allCursorsCounter.countDown();
                }
            }

            @Override
            public void operationFailed(ManagedLedgerException.MetaStoreException e) {
                log.warn("[{}] Failed to get the cursors list", (Object)managedLedgerName, (Object)e);
                allCursorsCounter.countDown();
            }
        });
        if (this.accurate) {
            allCursorsCounter.await();
        } else {
            allCursorsCounter.await(60L, TimeUnit.SECONDS);
        }
        if (this.accurate && ledgerRetryMap.size() > 0L) {
            ledgerRetryMap.forEach((cursorName, ledgerId) -> {
                PositionImpl lastAckedMessagePosition;
                if (log.isDebugEnabled()) {
                    log.debug("Cursor {} Ledger {} Trying to obtain MD from BkAdmin", cursorName, ledgerId);
                }
                if ((lastAckedMessagePosition = this.tryGetMDPosition(bk, (long)ledgerId, (String)cursorName)) == null) {
                    log.warn("[{}] Cursor {} read from ledger {}. Unable to determine cursor position", new Object[]{managedLedgerName, cursorName, ledgerId});
                } else {
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Cursor {} read from ledger using bk admin {}. position {}", new Object[]{managedLedgerName, cursorName, ledgerId, lastAckedMessagePosition});
                    }
                    Range<PositionImpl> range = Range.openClosed(lastAckedMessagePosition, lastLedgerPosition);
                    if (log.isDebugEnabled()) {
                        log.debug("[{}] Calculating backlog for cursor {} using range {}", new Object[]{managedLedgerName, cursorName, range});
                    }
                    long cursorBacklog = this.getNumberOfEntries(range, ledgers);
                    offlineTopicStats.messageBacklog += cursorBacklog;
                    offlineTopicStats.addCursorDetails((String)cursorName, cursorBacklog, (long)ledgerId);
                }
            });
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Loose catch block
     * Enabled force condition propagation
     * Lifted jumps to return sites
     */
    private PositionImpl tryGetMDPosition(BookKeeper bookKeeper, long ledgerId, String cursorName) {
        BookKeeperAdmin bookKeeperAdmin = null;
        long lastEntry = -1L;
        PositionImpl lastAckedMessagePosition = null;
        bookKeeperAdmin = new BookKeeperAdmin(bookKeeper);
        for (LedgerEntry ledgerEntry : bookKeeperAdmin.readEntries(ledgerId, 0L, lastEntry)) {
            lastEntry = ledgerEntry.getEntryId();
            if (log.isDebugEnabled()) {
                log.debug(" Read entry {} from ledger {} for cursor {}", new Object[]{lastEntry, ledgerId, cursorName});
            }
            MLDataFormats.PositionInfo positionInfo = MLDataFormats.PositionInfo.parseFrom(ledgerEntry.getEntry());
            lastAckedMessagePosition = new PositionImpl(positionInfo);
            if (!log.isDebugEnabled()) continue;
            log.debug("Cursor {} read position {}", (Object)cursorName, (Object)lastAckedMessagePosition);
        }
        if (bookKeeperAdmin == null) return lastAckedMessagePosition;
        try {
            bookKeeperAdmin.close();
            return lastAckedMessagePosition;
        }
        catch (Exception e) {
            log.warn("Unable to close bk admin for ledgerId {} for cursor {}", new Object[]{ledgerId, cursorName, e});
        }
        return lastAckedMessagePosition;
        catch (Exception e) {
            try {
                log.warn("Unable to determine LAC for ledgerId {} for cursor {}: {}", new Object[]{ledgerId, cursorName, e});
                if (bookKeeperAdmin == null) return lastAckedMessagePosition;
            }
            catch (Throwable throwable) {
                if (bookKeeperAdmin == null) throw throwable;
                try {
                    bookKeeperAdmin.close();
                    throw throwable;
                }
                catch (Exception e2) {
                    log.warn("Unable to close bk admin for ledgerId {} for cursor {}", new Object[]{ledgerId, cursorName, e2});
                }
                throw throwable;
            }
            try {
                bookKeeperAdmin.close();
                return lastAckedMessagePosition;
            }
            catch (Exception e3) {
                log.warn("Unable to close bk admin for ledgerId {} for cursor {}", new Object[]{ledgerId, cursorName, e3});
            }
            return lastAckedMessagePosition;
        }
    }
}

