/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.mledger.impl.cache;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.api.BKException;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.impl.cache.EntryCache;
import org.apache.bookkeeper.mledger.impl.cache.InflightReadsLimiter;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheManagerImpl;
import org.apache.bookkeeper.mledger.intercept.ManagedLedgerInterceptor;
import org.apache.bookkeeper.mledger.util.RangeCache;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RangeEntryCacheImpl
implements EntryCache {
    private static final long BOOKKEEPER_READ_OVERHEAD_PER_ENTRY = 64L;
    private final RangeEntryCacheManagerImpl manager;
    private final ManagedLedgerImpl ml;
    private ManagedLedgerInterceptor interceptor;
    private final RangeCache<PositionImpl, EntryImpl> entries;
    private final boolean copyEntries;
    private volatile long estimatedEntrySize = 10240L;
    private final long readEntryTimeoutMillis;
    private static final double MB = 1048576.0;
    public static final PooledByteBufAllocator ALLOCATOR = new PooledByteBufAllocator(true, 0, PooledByteBufAllocator.defaultNumDirectArena(), PooledByteBufAllocator.defaultPageSize(), PooledByteBufAllocator.defaultMaxOrder(), PooledByteBufAllocator.defaultSmallCacheSize(), PooledByteBufAllocator.defaultNormalCacheSize(), true);
    private static final Logger log = LoggerFactory.getLogger(RangeEntryCacheImpl.class);

    public RangeEntryCacheImpl(RangeEntryCacheManagerImpl manager, ManagedLedgerImpl ml, boolean copyEntries) {
        this.manager = manager;
        this.ml = ml;
        this.interceptor = ml.getManagedLedgerInterceptor();
        this.readEntryTimeoutMillis = this.getManagedLedgerConfig().getReadEntryTimeoutSeconds();
        this.entries = new RangeCache(EntryImpl::getLength, EntryImpl::getTimestamp);
        this.copyEntries = copyEntries;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Initialized managed-ledger entry cache", (Object)ml.getName());
        }
    }

    @VisibleForTesting
    ManagedLedgerConfig getManagedLedgerConfig() {
        return this.ml.getConfig();
    }

    @Override
    public String getName() {
        return this.ml.getName();
    }

    @VisibleForTesting
    InflightReadsLimiter getPendingReadsLimiter() {
        return this.manager.getInflightReadsLimiter();
    }

    @Override
    public boolean insert(EntryImpl entry) {
        ByteBuf cachedData;
        if (!this.manager.hasSpaceInCache()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Skipping cache while doing eviction: {} - size: {}", new Object[]{this.ml.getName(), entry.getPosition(), entry.getLength()});
            }
            return false;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Adding entry to cache: {} - size: {}", new Object[]{this.ml.getName(), entry.getPosition(), entry.getLength()});
        }
        if (this.copyEntries) {
            cachedData = this.copyEntry(entry);
            if (cachedData == null) {
                return false;
            }
        } else {
            cachedData = entry.getDataBuffer().retain();
        }
        PositionImpl position = entry.getPosition();
        EntryImpl cacheEntry = EntryImpl.create(position, cachedData);
        cachedData.release();
        if (this.entries.put(position, cacheEntry)) {
            this.manager.entryAdded(entry.getLength());
            return true;
        }
        cacheEntry.release();
        return false;
    }

    private ByteBuf copyEntry(EntryImpl entry) {
        int size = entry.getLength();
        ByteBuf cachedData = null;
        try {
            cachedData = ALLOCATOR.directBuffer(size, size);
        }
        catch (Throwable t) {
            log.warn("[{}] Failed to allocate buffer for entry cache: {}", (Object)this.ml.getName(), (Object)t.getMessage());
            return null;
        }
        if (size > 0) {
            ByteBuf entryBuf = entry.getDataBuffer();
            int readerIdx = entryBuf.readerIndex();
            cachedData.writeBytes(entryBuf);
            entryBuf.readerIndex(readerIdx);
        }
        return cachedData;
    }

    @Override
    public void invalidateEntries(PositionImpl lastPosition) {
        PositionImpl firstPosition = PositionImpl.get(-1L, 0L);
        if (firstPosition.compareTo(lastPosition) > 0) {
            if (log.isDebugEnabled()) {
                log.debug("Attempted to invalidate entries in an invalid range : {} ~ {}", (Object)firstPosition, (Object)lastPosition);
            }
            return;
        }
        Pair<Integer, Long> removed = this.entries.removeRange(firstPosition, lastPosition, false);
        int entriesRemoved = (Integer)removed.getLeft();
        long sizeRemoved = (Long)removed.getRight();
        if (log.isTraceEnabled()) {
            log.trace("[{}] Invalidated entries up to {} - Entries removed: {} - Size removed: {}", new Object[]{this.ml.getName(), lastPosition, entriesRemoved, sizeRemoved});
        }
        this.manager.entriesRemoved(sizeRemoved, entriesRemoved);
    }

    @Override
    public void invalidateAllEntries(long ledgerId) {
        PositionImpl firstPosition = PositionImpl.get(ledgerId, 0L);
        PositionImpl lastPosition = PositionImpl.get(ledgerId + 1L, 0L);
        Pair<Integer, Long> removed = this.entries.removeRange(firstPosition, lastPosition, false);
        int entriesRemoved = (Integer)removed.getLeft();
        long sizeRemoved = (Long)removed.getRight();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Invalidated all entries on ledger {} - Entries removed: {} - Size removed: {}", new Object[]{this.ml.getName(), ledgerId, entriesRemoved, sizeRemoved});
        }
        this.manager.entriesRemoved(sizeRemoved, entriesRemoved);
    }

    @Override
    public void asyncReadEntry(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
        try {
            this.asyncReadEntry0(lh, position, callback, ctx);
        }
        catch (Throwable t) {
            log.warn("failed to read entries for {}-{}", new Object[]{lh.getId(), position, t});
            this.invalidateAllEntries(lh.getId());
            callback.readEntryFailed(ManagedLedgerImpl.createManagedLedgerException(t), ctx);
        }
    }

    private void asyncReadEntry0(ReadHandle lh, PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
        EntryImpl entry;
        if (log.isDebugEnabled()) {
            log.debug("[{}] Reading entry ledger {}: {}", new Object[]{this.ml.getName(), lh.getId(), position.getEntryId()});
        }
        if ((entry = this.entries.get(position)) != null) {
            EntryImpl cachedEntry = EntryImpl.create(entry);
            entry.release();
            this.manager.mlFactoryMBean.recordCacheHit(cachedEntry.getLength());
            callback.readEntryComplete(cachedEntry, ctx);
        } else {
            ((CompletableFuture)lh.readAsync(position.getEntryId(), position.getEntryId()).thenAcceptAsync(ledgerEntries -> {
                try {
                    Iterator iterator = ledgerEntries.iterator();
                    if (iterator.hasNext()) {
                        LedgerEntry ledgerEntry = (LedgerEntry)iterator.next();
                        EntryImpl returnEntry = RangeEntryCacheManagerImpl.create(ledgerEntry, this.interceptor);
                        this.manager.mlFactoryMBean.recordCacheMiss(1, returnEntry.getLength());
                        this.ml.getMbean().addReadEntriesSample(1, returnEntry.getLength());
                        callback.readEntryComplete(returnEntry, ctx);
                    } else {
                        callback.readEntryFailed(new ManagedLedgerException("Could not read given position"), ctx);
                    }
                }
                finally {
                    ledgerEntries.close();
                }
            }, (Executor)this.ml.getExecutor().chooseThread((Object)this.ml.getName()))).exceptionally(exception -> {
                this.ml.invalidateLedgerHandle(lh);
                callback.readEntryFailed(ManagedLedgerImpl.createManagedLedgerException(exception), ctx);
                return null;
            });
        }
    }

    @Override
    public void asyncReadEntry(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        try {
            this.asyncReadEntry0(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx);
        }
        catch (Throwable t) {
            log.warn("failed to read entries for {}--{}-{}", new Object[]{lh.getId(), firstEntry, lastEntry, t});
            this.invalidateAllEntries(lh.getId());
            callback.readEntriesFailed(ManagedLedgerImpl.createManagedLedgerException(t), ctx);
        }
    }

    private void asyncReadEntry0(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, AsyncCallbacks.ReadEntriesCallback callback, Object ctx) {
        this.asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, isSlowestReader, callback, ctx, null);
    }

    void asyncReadEntry0WithLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean isSlowestReader, AsyncCallbacks.ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
        Collection<EntryImpl> cachedEntries;
        AsyncCallbacks.ReadEntriesCallback callback = this.handlePendingReadsLimits(lh, firstEntry, lastEntry, isSlowestReader, originalCallback, ctx, handle);
        if (callback == null) {
            return;
        }
        long ledgerId = lh.getId();
        int entriesToRead = (int)(lastEntry - firstEntry) + 1;
        PositionImpl firstPosition = PositionImpl.get(lh.getId(), firstEntry);
        PositionImpl lastPosition = PositionImpl.get(lh.getId(), lastEntry);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Reading entries range ledger {}: {} to {}", new Object[]{this.ml.getName(), ledgerId, firstEntry, lastEntry});
        }
        if ((cachedEntries = this.entries.getRange(firstPosition, lastPosition)).size() == entriesToRead) {
            long totalCachedSize = 0L;
            ArrayList entriesToReturn = Lists.newArrayListWithExpectedSize((int)entriesToRead);
            for (EntryImpl entry2 : cachedEntries) {
                entriesToReturn.add(EntryImpl.create(entry2));
                totalCachedSize += (long)entry2.getLength();
                entry2.release();
            }
            this.manager.mlFactoryMBean.recordCacheHits(entriesToReturn.size(), totalCachedSize);
            if (log.isDebugEnabled()) {
                log.debug("[{}] Ledger {} -- Found in cache entries: {}-{}", new Object[]{this.ml.getName(), ledgerId, firstEntry, lastEntry});
            }
            callback.readEntriesComplete(entriesToReturn, ctx);
        } else {
            if (!cachedEntries.isEmpty()) {
                cachedEntries.forEach(entry -> entry.release());
            }
            ((CompletableFuture)lh.readAsync(firstEntry, lastEntry).thenAcceptAsync(ledgerEntries -> {
                Preconditions.checkNotNull((Object)this.ml.getName());
                Preconditions.checkNotNull((Object)this.ml.getExecutor());
                try {
                    long totalSize = 0L;
                    ArrayList entriesToReturn = Lists.newArrayListWithExpectedSize((int)entriesToRead);
                    for (LedgerEntry e : ledgerEntries) {
                        EntryImpl entry = RangeEntryCacheManagerImpl.create(e, this.interceptor);
                        entriesToReturn.add(entry);
                        totalSize += (long)entry.getLength();
                    }
                    this.manager.mlFactoryMBean.recordCacheMiss(entriesToReturn.size(), totalSize);
                    this.ml.getMbean().addReadEntriesSample(entriesToReturn.size(), totalSize);
                    callback.readEntriesComplete(entriesToReturn, ctx);
                }
                finally {
                    ledgerEntries.close();
                }
            }, (Executor)this.ml.getExecutor().chooseThread((Object)this.ml.getName()))).exceptionally(exception -> {
                if (exception instanceof BKException && ((BKException)exception).getCode() == -105) {
                    callback.readEntriesFailed(ManagedLedgerImpl.createManagedLedgerException(exception), ctx);
                } else {
                    this.ml.invalidateLedgerHandle(lh);
                    ManagedLedgerException mlException = ManagedLedgerImpl.createManagedLedgerException(exception);
                    callback.readEntriesFailed(mlException, ctx);
                }
                return null;
            });
        }
    }

    private AsyncCallbacks.ReadEntriesCallback handlePendingReadsLimits(ReadHandle lh, long firstEntry, long lastEntry, boolean shouldCacheEntry, final AsyncCallbacks.ReadEntriesCallback originalCallback, Object ctx, InflightReadsLimiter.Handle handle) {
        final InflightReadsLimiter pendingReadsLimiter = this.getPendingReadsLimiter();
        if (pendingReadsLimiter.isDisabled()) {
            return originalCallback;
        }
        long estimatedReadSize = (1L + lastEntry - firstEntry) * (this.estimatedEntrySize + 64L);
        final InflightReadsLimiter.Handle newHandle = pendingReadsLimiter.acquire(estimatedReadSize, handle);
        if (!newHandle.success) {
            long now = System.currentTimeMillis();
            if (now - newHandle.creationTime > this.readEntryTimeoutMillis) {
                String message = "Time-out elapsed while acquiring enough permits on the memory limiter to read from ledger " + lh.getId() + ", " + this.getName() + ", estimated read size " + estimatedReadSize + " bytes for " + (1L + lastEntry - firstEntry) + " entries (check managedLedgerMaxReadsInFlightSizeInMB)";
                log.error(message);
                pendingReadsLimiter.release(newHandle);
                originalCallback.readEntriesFailed(new ManagedLedgerException.TooManyRequestsException(message), ctx);
                return null;
            }
            this.ml.getExecutor().submitOrdered(lh.getId(), () -> {
                this.asyncReadEntry0WithLimits(lh, firstEntry, lastEntry, shouldCacheEntry, originalCallback, ctx, newHandle);
                return null;
            });
            return null;
        }
        AsyncCallbacks.ReadEntriesCallback callback = new AsyncCallbacks.ReadEntriesCallback(){

            @Override
            public void readEntriesComplete(List<Entry> entries, Object ctx) {
                if (!entries.isEmpty()) {
                    long size = entries.get(0).getLength();
                    RangeEntryCacheImpl.this.estimatedEntrySize = size;
                    AtomicInteger remainingCount = new AtomicInteger(entries.size());
                    for (Entry entry : entries) {
                        ((EntryImpl)entry).onDeallocate(() -> {
                            if (remainingCount.decrementAndGet() <= 0) {
                                pendingReadsLimiter.release(newHandle);
                            }
                        });
                    }
                } else {
                    pendingReadsLimiter.release(newHandle);
                }
                originalCallback.readEntriesComplete(entries, ctx);
            }

            @Override
            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                pendingReadsLimiter.release(newHandle);
                originalCallback.readEntriesFailed(exception, ctx);
            }
        };
        return callback;
    }

    @Override
    public void clear() {
        Pair<Integer, Long> removedPair = this.entries.clear();
        this.manager.entriesRemoved((Long)removedPair.getRight(), (Integer)removedPair.getLeft());
    }

    @Override
    public long getSize() {
        return this.entries.getSize();
    }

    @Override
    public int compareTo(EntryCache other) {
        return Longs.compare((long)this.getSize(), (long)other.getSize());
    }

    @Override
    public Pair<Integer, Long> evictEntries(long sizeToFree) {
        Preconditions.checkArgument((sizeToFree > 0L ? 1 : 0) != 0);
        Pair<Integer, Long> evicted = this.entries.evictLeastAccessedEntries(sizeToFree);
        int evictedEntries = (Integer)evicted.getLeft();
        long evictedSize = (Long)evicted.getRight();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Doing cache eviction of at least {} Mb -- Deleted {} entries - Total size deleted: {} Mb  -- Current Size: {} Mb", new Object[]{this.ml.getName(), (double)sizeToFree / 1048576.0, evictedEntries, (double)evictedSize / 1048576.0, (double)this.entries.getSize() / 1048576.0});
        }
        this.manager.entriesRemoved(evictedSize, evictedEntries);
        return evicted;
    }

    @Override
    public void invalidateEntriesBeforeTimestamp(long timestamp) {
        Pair<Integer, Long> evictedPair = this.entries.evictLEntriesBeforeTimestamp(timestamp);
        this.manager.entriesRemoved((Long)evictedPair.getRight(), (Integer)evictedPair.getLeft());
    }
}

