package org.apache.bookkeeper.bookie.storage.ldb;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.bookie.Bookie;
import org.apache.bookkeeper.bookie.BookieException;
import org.apache.bookkeeper.bookie.CheckpointSource;
import org.apache.bookkeeper.bookie.EntryLogger;
import org.apache.bookkeeper.bookie.GarbageCollectorThread;
import org.apache.bookkeeper.bookie.LedgerDirsManager;
import org.apache.bookkeeper.bookie.LedgerStorage;
import org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorageDataFormats;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorage;
import org.apache.bookkeeper.bookie.storage.ldb.KeyValueStorageFactory;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.jmx.BKMBeanInfo;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.pulsar.shade.com.google.common.annotations.VisibleForTesting;
import org.apache.pulsar.shade.com.google.common.base.Preconditions;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.com.google.protobuf.ByteString;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.pulsar.shade.org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage.class */
public class DbLedgerStorage implements GarbageCollectorThread.CompactableLedgerStorage {
    private EntryLogger entryLogger;
    private LedgerMetadataIndex ledgerIndex;
    private EntryLocationIndex entryLocationIndex;
    private GarbageCollectorThread gcThread;
    protected WriteCache writeCache;
    protected WriteCache writeCacheBeingFlushed;
    private ReadCache readCache;
    static final String WRITE_CACHE_MAX_SIZE_MB = "dbStorage_writeCacheMaxSizeMb";
    static final String READ_AHEAD_CACHE_BATCH_SIZE = "dbStorage_readAheadCacheBatchSize";
    static final String READ_AHEAD_CACHE_MAX_SIZE_MB = "dbStorage_readAheadCacheMaxSizeMb";
    private static final long DEFAULT_WRITE_CACHE_MAX_SIZE_MB = 16;
    private static final long DEFAULT_READ_CACHE_MAX_SIZE_MB = 16;
    private static final int DEFAULT_READ_AHEAD_CACHE_BATCH_SIZE = 100;
    private static final int MB = 1048576;
    private long writeCacheMaxSize;
    private long readCacheMaxSize;
    private int readAheadCacheBatchSize;
    private StatsLogger stats;
    private OpStatsLogger addEntryStats;
    private OpStatsLogger readEntryStats;
    private OpStatsLogger readCacheHitStats;
    private OpStatsLogger readCacheMissStats;
    private OpStatsLogger readAheadBatchCountStats;
    private OpStatsLogger readAheadBatchSizeStats;
    private OpStatsLogger flushStats;
    private OpStatsLogger flushSizeStats;
    private static final Map<Long, Long> lastAddCompletedMap = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(DbLedgerStorage.class);
    private final ReentrantReadWriteLock writeCacheMutex = new ReentrantReadWriteLock();
    private final Condition flushWriteCacheCondition = this.writeCacheMutex.writeLock().newCondition();
    protected final AtomicBoolean hasFlushBeenTriggered = new AtomicBoolean(false);
    private final AtomicBoolean isFlushOngoing = new AtomicBoolean(false);
    private final ExecutorService executor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage"));
    private final ExecutorService cleanupExecutor = Executors.newSingleThreadExecutor(new DefaultThreadFactory("db-storage-cleanup"));
    private final CopyOnWriteArrayList<LedgerStorage.LedgerDeletionListener> ledgerDeletionListeners = Lists.newCopyOnWriteArrayList();
    private CheckpointSource checkpointSource = null;
    private CheckpointSource.Checkpoint lastCheckpoint = CheckpointSource.Checkpoint.MIN;

    /* loaded from: input_file:org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage$DbLedgerStorageBean.class */
    public static class DbLedgerStorageBean implements DbLedgerStorageMXBean, BKMBeanInfo {
        @Override // org.apache.zookeeper.jmx.ZKMBeanInfo
        public boolean isHidden() {
            return true;
        }

        @Override // org.apache.zookeeper.jmx.ZKMBeanInfo
        public String getName() {
            return "DbLedgerStorage";
        }
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage$DbLedgerStorageMXBean.class */
    public interface DbLedgerStorageMXBean {
    }

    /* loaded from: input_file:org/apache/bookkeeper/bookie/storage/ldb/DbLedgerStorage$LedgerLoggerProcessor.class */
    public interface LedgerLoggerProcessor {
        void process(long j, long j2, long j3);
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public void initialize(ServerConfiguration serverConfiguration, GarbageCollectorThread.LedgerManagerProvider ledgerManagerProvider, LedgerDirsManager ledgerDirsManager, LedgerDirsManager ledgerDirsManager2, CheckpointSource checkpointSource, StatsLogger statsLogger) throws IOException {
        Preconditions.checkArgument(ledgerDirsManager.getAllLedgerDirs().size() == 1, "Db implementation only allows for one storage dir");
        String file = ledgerDirsManager.getAllLedgerDirs().get(0).toString();
        this.writeCacheMaxSize = serverConfiguration.getLong(WRITE_CACHE_MAX_SIZE_MB, 16L) * FileUtils.ONE_MB;
        this.writeCache = new WriteCache(this.writeCacheMaxSize / 2);
        this.writeCacheBeingFlushed = new WriteCache(this.writeCacheMaxSize / 2);
        this.checkpointSource = checkpointSource;
        this.readCacheMaxSize = serverConfiguration.getLong(READ_AHEAD_CACHE_MAX_SIZE_MB, 16L) * FileUtils.ONE_MB;
        this.readAheadCacheBatchSize = serverConfiguration.getInt(READ_AHEAD_CACHE_BATCH_SIZE, 100);
        this.readCache = new ReadCache(this.readCacheMaxSize);
        this.stats = statsLogger;
        log.info("Started Db Ledger Storage");
        log.info(" - Write cache size: {} MB", Long.valueOf(this.writeCacheMaxSize / FileUtils.ONE_MB));
        log.info(" - Read Cache: {} MB", Long.valueOf(this.readCacheMaxSize / FileUtils.ONE_MB));
        log.info(" - Read Ahead Batch size: : {}", Integer.valueOf(this.readAheadCacheBatchSize));
        this.ledgerIndex = new LedgerMetadataIndex(serverConfiguration, KeyValueStorageRocksDB.factory, file, this.stats);
        this.entryLocationIndex = new EntryLocationIndex(serverConfiguration, KeyValueStorageRocksDB.factory, file, this.stats);
        this.entryLogger = new EntryLogger(serverConfiguration, ledgerDirsManager);
        this.gcThread = new GarbageCollectorThread(serverConfiguration, ledgerManagerProvider, this);
        registerStats();
    }

    public void registerStats() {
        this.stats.registerGauge("write-cache-size", new Gauge<Long>() { // from class: org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getDefaultValue() {
                return 0L;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getSample() {
                return Long.valueOf(DbLedgerStorage.this.writeCache.size() + DbLedgerStorage.this.writeCacheBeingFlushed.size());
            }
        });
        this.stats.registerGauge("write-cache-count", new Gauge<Long>() { // from class: org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getDefaultValue() {
                return 0L;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getSample() {
                return Long.valueOf(DbLedgerStorage.this.writeCache.count() + DbLedgerStorage.this.writeCacheBeingFlushed.count());
            }
        });
        this.stats.registerGauge("read-cache-size", new Gauge<Long>() { // from class: org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.3
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getDefaultValue() {
                return 0L;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getSample() {
                return Long.valueOf(DbLedgerStorage.this.readCache.size());
            }
        });
        this.stats.registerGauge("read-cache-count", new Gauge<Long>() { // from class: org.apache.bookkeeper.bookie.storage.ldb.DbLedgerStorage.4
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getDefaultValue() {
                return 0L;
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // org.apache.bookkeeper.stats.Gauge
            public Long getSample() {
                return Long.valueOf(DbLedgerStorage.this.readCache.count());
            }
        });
        this.addEntryStats = this.stats.getOpStatsLogger("add-entry");
        this.readEntryStats = this.stats.getOpStatsLogger("read-entry");
        this.readCacheHitStats = this.stats.getOpStatsLogger("read-cache-hits");
        this.readCacheMissStats = this.stats.getOpStatsLogger("read-cache-misses");
        this.readAheadBatchCountStats = this.stats.getOpStatsLogger("readahead-batch-count");
        this.readAheadBatchSizeStats = this.stats.getOpStatsLogger("readahead-batch-size");
        this.flushStats = this.stats.getOpStatsLogger("flush");
        this.flushSizeStats = this.stats.getOpStatsLogger("flush-size");
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public void start() {
        this.gcThread.start();
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public void shutdown() throws InterruptedException {
        try {
            flush();
            this.gcThread.shutdown();
            this.entryLogger.shutdown();
            this.cleanupExecutor.shutdown();
            this.cleanupExecutor.awaitTermination(1L, TimeUnit.SECONDS);
            this.ledgerIndex.close();
            this.entryLocationIndex.close();
            this.writeCache.close();
            this.writeCacheBeingFlushed.close();
            this.readCache.close();
            this.executor.shutdown();
        } catch (IOException e) {
            log.error("Error closing db storage", e);
        }
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public long getLastAddConfirmed(long j) throws IOException {
        Long l = lastAddCompletedMap.get(Long.valueOf(j));
        if (l == null) {
            long lastEntryInLedger = this.entryLocationIndex.getLastEntryInLedger(j);
            if (lastEntryInLedger == -1) {
                return -1L;
            }
            l = Long.valueOf(lastEntryInLedger);
            lastAddCompletedMap.put(Long.valueOf(j), l);
        }
        return l.longValue();
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public boolean ledgerExists(long j) throws IOException {
        try {
            DbLedgerStorageDataFormats.LedgerData ledgerData = this.ledgerIndex.get(j);
            if (log.isDebugEnabled()) {
                log.debug("Ledger exists. ledger: {} : {}", Long.valueOf(j), Boolean.valueOf(ledgerData.getExists()));
            }
            return ledgerData.getExists();
        } catch (Bookie.NoLedgerException e) {
            return false;
        }
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public boolean isFenced(long j) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("isFenced. ledger: {}", Long.valueOf(j));
        }
        return this.ledgerIndex.get(j).getFenced();
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public boolean setFenced(long j) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("Set fenced. ledger: {}", Long.valueOf(j));
        }
        return this.ledgerIndex.setFenced(j);
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public void setMasterKey(long j, byte[] bArr) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("Set master key. ledger: {}", Long.valueOf(j));
        }
        this.ledgerIndex.setMasterKey(j, bArr);
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public byte[] readMasterKey(long j) throws IOException, BookieException {
        if (log.isDebugEnabled()) {
            log.debug("Read master key. ledger: {}", Long.valueOf(j));
        }
        return this.ledgerIndex.get(j).getMasterKey().toByteArray();
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public long addEntry(ByteBuf byteBuf) throws IOException {
        long nowInNano = MathUtils.nowInNano();
        long readLong = byteBuf.readLong();
        long readLong2 = byteBuf.readLong();
        byteBuf.resetReaderIndex();
        if (log.isDebugEnabled()) {
            log.debug("Add entry. {}@{}", Long.valueOf(readLong), Long.valueOf(readLong2));
        }
        this.writeCacheMutex.readLock().lock();
        try {
            boolean put = this.writeCache.put(readLong, readLong2, byteBuf);
            this.writeCacheMutex.readLock().unlock();
            if (!put) {
                triggerFlushAndAddEntry(readLong, readLong2, byteBuf);
            }
            recordSuccessfulEvent(this.addEntryStats, nowInNano);
            return readLong2;
        } catch (Throwable th) {
            this.writeCacheMutex.readLock().unlock();
            throw th;
        }
    }

    private void triggerFlushAndAddEntry(long j, long j2, ByteBuf byteBuf) throws IOException {
        this.writeCacheMutex.writeLock().lock();
        try {
            try {
                if (!this.isFlushOngoing.get() && this.hasFlushBeenTriggered.compareAndSet(false, true)) {
                    log.info("Write cache is full, triggering flush");
                    this.executor.execute(() -> {
                        try {
                            flush();
                        } catch (IOException e) {
                            log.error("Error during flush", e);
                        }
                    });
                }
                long nanos = TimeUnit.MILLISECONDS.toNanos(100L);
                while (this.hasFlushBeenTriggered.get()) {
                    if (nanos <= 0) {
                        throw new IOException("Write cache was not trigger within the timeout, cannot add entry " + j + "@" + j2);
                    }
                    nanos = this.flushWriteCacheCondition.awaitNanos(nanos);
                }
                if (!this.writeCache.put(j, j2, byteBuf)) {
                    throw new IOException("Error while inserting entry in write cache" + j + "@" + j2);
                }
            } catch (InterruptedException e) {
                throw new IOException("Interrupted when adding entry " + j + "@" + j2);
            }
        } finally {
            this.writeCacheMutex.writeLock().unlock();
        }
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public ByteBuf getEntry(long j, long j2) throws IOException {
        long nowInNano = MathUtils.nowInNano();
        if (log.isDebugEnabled()) {
            log.debug("Get Entry: {}@{}", Long.valueOf(j), Long.valueOf(j2));
        }
        if (j2 == -1) {
            return getLastEntry(j);
        }
        this.writeCacheMutex.readLock().lock();
        try {
            ByteBuf byteBuf = this.writeCache.get(j, j2);
            if (byteBuf != null) {
                recordSuccessfulEvent(this.readCacheHitStats, nowInNano);
                recordSuccessfulEvent(this.readEntryStats, nowInNano);
                this.writeCacheMutex.readLock().unlock();
                return byteBuf;
            }
            ByteBuf byteBuf2 = this.writeCacheBeingFlushed.get(j, j2);
            if (byteBuf2 != null) {
                recordSuccessfulEvent(this.readCacheHitStats, nowInNano);
                recordSuccessfulEvent(this.readEntryStats, nowInNano);
                this.writeCacheMutex.readLock().unlock();
                return byteBuf2;
            }
            this.writeCacheMutex.readLock().unlock();
            ByteBuf byteBuf3 = this.readCache.get(j, j2);
            if (byteBuf3 != null) {
                recordSuccessfulEvent(this.readCacheHitStats, nowInNano);
                recordSuccessfulEvent(this.readEntryStats, nowInNano);
                return byteBuf3;
            }
            try {
                long location = this.entryLocationIndex.getLocation(j, j2);
                if (location == 0) {
                    throw new Bookie.NoEntryException(j, j2);
                }
                ByteBuf readEntry = this.entryLogger.readEntry(j, j2, location);
                this.readCache.put(j, j2, readEntry);
                fillReadAheadCache(j, j2 + 1, location + 4 + readEntry.readableBytes());
                recordSuccessfulEvent(this.readCacheMissStats, nowInNano);
                recordSuccessfulEvent(this.readEntryStats, nowInNano);
                return readEntry;
            } catch (Bookie.NoEntryException e) {
                recordFailedEvent(this.readEntryStats, nowInNano);
                throw e;
            }
        } catch (Throwable th) {
            this.writeCacheMutex.readLock().unlock();
            throw th;
        }
    }

    private void fillReadAheadCache(long j, long j2, long j3) {
        try {
            long j4 = j3 >> 32;
            long j5 = j4;
            long j6 = j3;
            int i = 0;
            long j7 = 0;
            while (i < this.readAheadCacheBatchSize && j5 == j4) {
                ByteBuf internalReadEntry = this.entryLogger.internalReadEntry(j, -1L, j6);
                long j8 = internalReadEntry.getLong(0);
                long j9 = internalReadEntry.getLong(8);
                if (j8 != j) {
                    internalReadEntry.release();
                    return;
                }
                this.readCache.put(j, j9, internalReadEntry);
                i++;
                j7 += internalReadEntry.readableBytes();
                j6 += 4 + internalReadEntry.readableBytes();
                j5 = j6 >> 32;
                internalReadEntry.release();
            }
            this.readAheadBatchCountStats.registerSuccessfulValue(i);
            this.readAheadBatchSizeStats.registerSuccessfulValue(j7);
        } catch (Exception e) {
            if (log.isDebugEnabled()) {
                log.debug("Exception during read ahead for ledger: {}: e", Long.valueOf(j), e);
            }
        }
    }

    public ByteBuf getLastEntry(long j) throws IOException {
        long nowInNano = MathUtils.nowInNano();
        this.writeCacheMutex.readLock().lock();
        try {
            ByteBuf lastEntry = this.writeCache.getLastEntry(j);
            if (lastEntry != null) {
                if (log.isDebugEnabled()) {
                    long readLong = lastEntry.readLong();
                    long readLong2 = lastEntry.readLong();
                    lastEntry.resetReaderIndex();
                    if (log.isDebugEnabled()) {
                        log.debug("Found last entry for ledger {} in write cache: {}@{}", new Object[]{Long.valueOf(j), Long.valueOf(readLong), Long.valueOf(readLong2)});
                    }
                }
                recordSuccessfulEvent(this.readCacheHitStats, nowInNano);
                recordSuccessfulEvent(this.readEntryStats, nowInNano);
                this.writeCacheMutex.readLock().unlock();
                return lastEntry;
            }
            ByteBuf lastEntry2 = this.writeCacheBeingFlushed.getLastEntry(j);
            if (lastEntry2 == null) {
                this.writeCacheMutex.readLock().unlock();
                long lastEntryInLedger = this.entryLocationIndex.getLastEntryInLedger(j);
                if (log.isDebugEnabled()) {
                    log.debug("Found last entry for ledger {} in db: {}", Long.valueOf(j), Long.valueOf(lastEntryInLedger));
                }
                ByteBuf readEntry = this.entryLogger.readEntry(j, lastEntryInLedger, this.entryLocationIndex.getLocation(j, lastEntryInLedger));
                recordSuccessfulEvent(this.readCacheMissStats, nowInNano);
                recordSuccessfulEvent(this.readEntryStats, nowInNano);
                return readEntry;
            }
            if (log.isDebugEnabled()) {
                lastEntry2.readLong();
                long readLong3 = lastEntry2.readLong();
                lastEntry2.resetReaderIndex();
                if (log.isDebugEnabled()) {
                    log.debug("Found last entry for ledger {} in write cache being flushed: {}", Long.valueOf(j), Long.valueOf(readLong3));
                }
            }
            recordSuccessfulEvent(this.readCacheHitStats, nowInNano);
            recordSuccessfulEvent(this.readEntryStats, nowInNano);
            this.writeCacheMutex.readLock().unlock();
            return lastEntry2;
        } catch (Throwable th) {
            this.writeCacheMutex.readLock().unlock();
            throw th;
        }
    }

    @VisibleForTesting
    boolean isFlushRequired() {
        this.writeCacheMutex.readLock().lock();
        try {
            return !this.writeCache.isEmpty();
        } finally {
            this.writeCacheMutex.readLock().unlock();
        }
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public synchronized CheckpointSource.Checkpoint checkpoint(CheckpointSource.Checkpoint checkpoint) throws IOException {
        CheckpointSource.Checkpoint newCheckpoint = this.checkpointSource.newCheckpoint();
        if (this.lastCheckpoint.compareTo(checkpoint) > 0) {
            return this.lastCheckpoint;
        }
        long nowInNano = MathUtils.nowInNano();
        this.writeCacheMutex.writeLock().lock();
        try {
            WriteCache writeCache = this.writeCacheBeingFlushed;
            this.writeCacheBeingFlushed = this.writeCache;
            this.writeCache = writeCache;
            this.hasFlushBeenTriggered.set(false);
            this.flushWriteCacheCondition.signalAll();
            this.isFlushOngoing.set(true);
            this.writeCacheMutex.writeLock().unlock();
            long size = this.writeCacheBeingFlushed.size();
            if (log.isDebugEnabled()) {
                log.debug("Flushing entries. count: {} -- size {} Mb", Long.valueOf(this.writeCacheBeingFlushed.count()), Double.valueOf((size / 1024.0d) / 1024.0d));
            }
            try {
                try {
                    KeyValueStorage.Batch newBatch = this.entryLocationIndex.newBatch();
                    HashMap hashMap = new HashMap();
                    this.writeCacheBeingFlushed.forEach((j, j2, byteBuf) -> {
                        try {
                            this.entryLocationIndex.addLocation(newBatch, j, j2, this.entryLogger.addEntry(j, byteBuf, true));
                            hashMap.put(Long.valueOf(j), Long.valueOf(j2));
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });
                    this.entryLogger.flush();
                    long nanoTime = System.nanoTime();
                    newBatch.flush();
                    newBatch.close();
                    if (log.isDebugEnabled()) {
                        log.debug("DB batch flushed time : {} s", Double.valueOf(MathUtils.elapsedNanos(nanoTime) / TimeUnit.SECONDS.toNanos(1L)));
                    }
                    this.ledgerIndex.flush();
                    for (Map.Entry entry : hashMap.entrySet()) {
                        Long l = (Long) entry.getKey();
                        Long l2 = (Long) entry.getValue();
                        if (l2 != null) {
                            Long l3 = lastAddCompletedMap.get(l);
                            if (l3 == null || l2.longValue() > l3.longValue()) {
                                lastAddCompletedMap.put(l, l2);
                            }
                        }
                    }
                    this.cleanupExecutor.execute(() -> {
                        try {
                            if (log.isDebugEnabled()) {
                                log.debug("Removing deleted ledgers from db indexes");
                            }
                            this.entryLocationIndex.removeOffsetFromDeletedLedgers();
                            this.ledgerIndex.removeDeletedLedgers();
                        } catch (Throwable th) {
                            log.warn("Failed to cleanup db indexes", th);
                        }
                    });
                    this.lastCheckpoint = newCheckpoint;
                    this.writeCacheBeingFlushed.clear();
                    double elapsedNanos = MathUtils.elapsedNanos(nowInNano) / TimeUnit.SECONDS.toNanos(1L);
                    double d = ((size / FileUtils.ONE_KB) / FileUtils.ONE_KB) / elapsedNanos;
                    if (log.isDebugEnabled()) {
                        log.debug("Flushing done time {} s -- Written {} MB/s", Double.valueOf(elapsedNanos), Double.valueOf(d));
                    }
                    recordSuccessfulEvent(this.flushStats, nowInNano);
                    this.flushSizeStats.registerSuccessfulValue(size);
                    this.isFlushOngoing.set(false);
                    return this.lastCheckpoint;
                } catch (IOException e) {
                    throw e;
                } catch (RuntimeException e2) {
                    throw new IOException(e2);
                }
            } catch (Throwable th) {
                this.isFlushOngoing.set(false);
                throw th;
            }
        } catch (Throwable th2) {
            this.isFlushOngoing.set(true);
            this.writeCacheMutex.writeLock().unlock();
            throw th2;
        }
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public synchronized void flush() throws IOException {
        checkpoint(CheckpointSource.Checkpoint.MAX);
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public void deleteLedger(long j) throws IOException {
        if (log.isDebugEnabled()) {
            log.debug("Deleting ledger {}", Long.valueOf(j));
        }
        this.writeCacheMutex.readLock().lock();
        try {
            this.writeCache.deleteLedger(j);
            lastAddCompletedMap.remove(Long.valueOf(j));
            this.entryLocationIndex.delete(j);
            this.ledgerIndex.delete(j);
            int size = this.ledgerDeletionListeners.size();
            for (int i = 0; i < size; i++) {
                this.ledgerDeletionListeners.get(i).ledgerDeleted(j);
            }
        } finally {
            this.writeCacheMutex.readLock().unlock();
        }
    }

    @Override // org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactableLedgerStorage
    public Iterable<Long> getActiveLedgersInRange(long j, long j2) throws IOException {
        return this.ledgerIndex.getActiveLedgersInRange(j, j2);
    }

    @Override // org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactableLedgerStorage
    public void updateEntriesLocations(Iterable<GarbageCollectorThread.CompactableLedgerStorage.EntryLocation> iterable) throws IOException {
        flush();
        this.entryLocationIndex.updateLocations(iterable);
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public BKMBeanInfo getJMXBean() {
        return new DbLedgerStorageBean();
    }

    @Override // org.apache.bookkeeper.bookie.GarbageCollectorThread.CompactableLedgerStorage
    public EntryLogger getEntryLogger() {
        return this.entryLogger;
    }

    public long addLedgerToIndex(long j, boolean z, byte[] bArr, Iterable<SortedMap<Long, Long>> iterable) throws Exception {
        this.ledgerIndex.set(j, DbLedgerStorageDataFormats.LedgerData.newBuilder().setExists(true).setFenced(z).setMasterKey(ByteString.copyFrom(bArr)).build());
        AtomicLong atomicLong = new AtomicLong();
        KeyValueStorage.Batch newBatch = this.entryLocationIndex.newBatch();
        iterable.forEach(sortedMap -> {
            sortedMap.forEach((l, l2) -> {
                try {
                    this.entryLocationIndex.addLocation(newBatch, j, l.longValue(), l2.longValue());
                    atomicLong.incrementAndGet();
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            });
        });
        newBatch.flush();
        newBatch.close();
        return atomicLong.get();
    }

    @Override // org.apache.bookkeeper.bookie.LedgerStorage
    public void registerLedgerDeletionListener(LedgerStorage.LedgerDeletionListener ledgerDeletionListener) {
        this.ledgerDeletionListeners.add(ledgerDeletionListener);
    }

    public EntryLocationIndex getEntryLocationIndex() {
        return this.entryLocationIndex;
    }

    private void recordSuccessfulEvent(OpStatsLogger opStatsLogger, long j) {
        opStatsLogger.registerSuccessfulEvent(MathUtils.elapsedNanos(j), TimeUnit.NANOSECONDS);
    }

    private void recordFailedEvent(OpStatsLogger opStatsLogger, long j) {
        opStatsLogger.registerFailedEvent(MathUtils.elapsedNanos(j), TimeUnit.NANOSECONDS);
    }

    public static void readLedgerIndexEntries(long j, ServerConfiguration serverConfiguration, LedgerLoggerProcessor ledgerLoggerProcessor) throws IOException {
        Preconditions.checkNotNull(serverConfiguration, "ServerConfiguration can't be null");
        Preconditions.checkNotNull(ledgerLoggerProcessor, "LedgerLoggger info processor can't null");
        EntryLocationIndex entryLocationIndex = new EntryLocationIndex(serverConfiguration, (str, dbConfigType, serverConfiguration2) -> {
            return new KeyValueStorageRocksDB(str, KeyValueStorageFactory.DbConfigType.Small, serverConfiguration2, true);
        }, new LedgerDirsManager(serverConfiguration, serverConfiguration.getLedgerDirs()).getAllLedgerDirs().get(0).toString(), NullStatsLogger.INSTANCE);
        try {
            long lastEntryInLedger = entryLocationIndex.getLastEntryInLedger(j);
            for (long j2 = 0; j2 <= lastEntryInLedger; j2++) {
                long location = entryLocationIndex.getLocation(j, j2);
                if (location > 0) {
                    ledgerLoggerProcessor.process(j2, location >> 32, location & 4294967295L);
                }
            }
        } finally {
            entryLocationIndex.close();
        }
    }
}
