package com.twitter.distributedlog;

import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.io.IOException;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/distributedlog/LedgerHandleCache.class */
public class LedgerHandleCache {
    static final Logger LOG = LoggerFactory.getLogger(LedgerHandleCache.class);
    final ConcurrentHashMap<LedgerDescriptor, RefCountedLedgerHandle> handlesMap;
    private final BookKeeperClient bkc;
    private final String digestpw;
    private final OpStatsLogger openStats;
    private final OpStatsLogger openNoRecoveryStats;

    /* loaded from: input_file:com/twitter/distributedlog/LedgerHandleCache$Builder.class */
    public static class Builder {
        private BookKeeperClient bkc;
        private String digestpw;
        private StatsLogger statsLogger;

        private Builder() {
            this.statsLogger = NullStatsLogger.INSTANCE;
        }

        public Builder bkc(BookKeeperClient bookKeeperClient) {
            this.bkc = bookKeeperClient;
            return this;
        }

        public Builder conf(DistributedLogConfiguration distributedLogConfiguration) {
            this.digestpw = distributedLogConfiguration.getBKDigestPW();
            return this;
        }

        public Builder statsLogger(StatsLogger statsLogger) {
            this.statsLogger = statsLogger;
            return this;
        }

        public LedgerHandleCache build() {
            Preconditions.checkNotNull(this.bkc, "No bookkeeper client is provided");
            Preconditions.checkNotNull(this.digestpw, "No bookkeeper digest password is provided");
            Preconditions.checkNotNull(this.statsLogger, "No stats logger is provided");
            return new LedgerHandleCache(this.bkc, this.digestpw, this.statsLogger);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/LedgerHandleCache$RefCountedLedgerHandle.class */
    public static class RefCountedLedgerHandle {
        public final LedgerHandle handle;
        final AtomicLong refcount = new AtomicLong(0);

        RefCountedLedgerHandle(LedgerHandle ledgerHandle) {
            this.handle = ledgerHandle;
            addRef();
        }

        long getRefCount() {
            return this.refcount.get();
        }

        public void addRef() {
            this.refcount.incrementAndGet();
        }

        public boolean removeRef() {
            return this.refcount.decrementAndGet() == 0;
        }

        public void forceClose() {
            try {
                this.handle.close();
            } catch (Exception e) {
                LedgerHandleCache.LOG.warn("Exception while closing ledger {}", this.handle, e);
            } catch (BKException.BKLedgerClosedException e2) {
            }
        }
    }

    public static Builder newBuilder() {
        return new Builder();
    }

    private LedgerHandleCache(BookKeeperClient bookKeeperClient, String str, StatsLogger statsLogger) {
        this.handlesMap = new ConcurrentHashMap<>();
        this.bkc = bookKeeperClient;
        this.digestpw = str;
        this.openStats = statsLogger.getOpStatsLogger("open_ledger");
        this.openNoRecoveryStats = statsLogger.getOpStatsLogger("open_ledger_no_recovery");
    }

    private void asyncOpenLedger(LedgerDescriptor ledgerDescriptor, AsyncCallback.OpenCallback openCallback, Object obj) {
        try {
            if (ledgerDescriptor.isFenced()) {
                this.bkc.get().asyncOpenLedger(ledgerDescriptor.getLedgerId(), BookKeeper.DigestType.CRC32, this.digestpw.getBytes(Charsets.UTF_8), openCallback, obj);
            } else {
                this.bkc.get().asyncOpenLedgerNoRecovery(ledgerDescriptor.getLedgerId(), BookKeeper.DigestType.CRC32, this.digestpw.getBytes(Charsets.UTF_8), openCallback, obj);
            }
        } catch (IOException e) {
            openCallback.openComplete(-8, (LedgerHandle) null, obj);
        }
    }

    public Future<LedgerDescriptor> asyncOpenLedger(LogSegmentMetadata logSegmentMetadata, boolean z) {
        final Stopwatch createStarted = Stopwatch.createStarted();
        final OpStatsLogger opStatsLogger = z ? this.openStats : this.openNoRecoveryStats;
        final Promise promise = new Promise();
        final LedgerDescriptor ledgerDescriptor = new LedgerDescriptor(logSegmentMetadata.getLedgerId(), logSegmentMetadata.getLogSegmentSequenceNumber(), z);
        RefCountedLedgerHandle refCountedLedgerHandle = this.handlesMap.get(ledgerDescriptor);
        if (null == refCountedLedgerHandle) {
            asyncOpenLedger(ledgerDescriptor, new AsyncCallback.OpenCallback() { // from class: com.twitter.distributedlog.LedgerHandleCache.1
                public void openComplete(int i, LedgerHandle ledgerHandle, Object obj) {
                    if (0 != i) {
                        promise.setException(BKException.create(i));
                        return;
                    }
                    RefCountedLedgerHandle refCountedLedgerHandle2 = new RefCountedLedgerHandle(ledgerHandle);
                    RefCountedLedgerHandle putIfAbsent = LedgerHandleCache.this.handlesMap.putIfAbsent(ledgerDescriptor, refCountedLedgerHandle2);
                    if (null != putIfAbsent) {
                        putIfAbsent.addRef();
                        if (refCountedLedgerHandle2.removeRef()) {
                            refCountedLedgerHandle2.handle.asyncClose(new AsyncCallback.CloseCallback() { // from class: com.twitter.distributedlog.LedgerHandleCache.1.1
                                public void closeComplete(int i2, LedgerHandle ledgerHandle2, Object obj2) {
                                }
                            }, (Object) null);
                        }
                    }
                    promise.setValue(ledgerDescriptor);
                }
            }, null);
        } else {
            refCountedLedgerHandle.addRef();
            promise.setValue(ledgerDescriptor);
        }
        return promise.addEventListener(new FutureEventListener<LedgerDescriptor>() { // from class: com.twitter.distributedlog.LedgerHandleCache.2
            public void onSuccess(LedgerDescriptor ledgerDescriptor2) {
                opStatsLogger.registerSuccessfulEvent(createStarted.elapsed(TimeUnit.MICROSECONDS));
            }

            public void onFailure(Throwable th) {
                opStatsLogger.registerFailedEvent(createStarted.elapsed(TimeUnit.MICROSECONDS));
            }
        });
    }

    public LedgerDescriptor openLedger(LogSegmentMetadata logSegmentMetadata, boolean z) throws BKException {
        return (LedgerDescriptor) FutureUtils.bkResult(asyncOpenLedger(logSegmentMetadata, z));
    }

    private RefCountedLedgerHandle getLedgerHandle(LedgerDescriptor ledgerDescriptor) {
        if (null == ledgerDescriptor) {
            return null;
        }
        return this.handlesMap.get(ledgerDescriptor);
    }

    public Future<Void> asyncCloseLedger(LedgerDescriptor ledgerDescriptor) {
        final Promise promise = new Promise();
        RefCountedLedgerHandle ledgerHandle = getLedgerHandle(ledgerDescriptor);
        if (null == ledgerHandle || !ledgerHandle.removeRef()) {
            promise.setValue((Object) null);
        } else {
            RefCountedLedgerHandle remove = this.handlesMap.remove(ledgerDescriptor);
            if (remove.getRefCount() > 0) {
                this.handlesMap.putIfAbsent(ledgerDescriptor, remove);
                promise.setValue((Object) null);
            } else {
                remove.handle.asyncClose(new AsyncCallback.CloseCallback() { // from class: com.twitter.distributedlog.LedgerHandleCache.3
                    public void closeComplete(int i, LedgerHandle ledgerHandle2, Object obj) {
                        if (0 == i) {
                            promise.setValue((Object) null);
                        } else {
                            promise.setException(BKException.create(i));
                        }
                    }
                }, (Object) null);
            }
        }
        return promise;
    }

    public void closeLedger(LedgerDescriptor ledgerDescriptor) throws BKException {
        FutureUtils.bkResult(asyncCloseLedger(ledgerDescriptor));
    }

    public long getLastAddConfirmed(LedgerDescriptor ledgerDescriptor) throws BKException {
        RefCountedLedgerHandle ledgerHandle = getLedgerHandle(ledgerDescriptor);
        if (null != ledgerHandle) {
            return ledgerHandle.handle.getLastAddConfirmed();
        }
        LOG.error("Accessing ledger {} without opening.", ledgerDescriptor);
        throw BKException.create(-999);
    }

    public boolean isLedgerHandleClosed(LedgerDescriptor ledgerDescriptor) throws BKException {
        RefCountedLedgerHandle ledgerHandle = getLedgerHandle(ledgerDescriptor);
        if (null != ledgerHandle) {
            return ledgerHandle.handle.isClosed();
        }
        LOG.error("Accessing ledger {} without opening.", ledgerDescriptor);
        throw BKException.create(-999);
    }

    public Future<Long> asyncTryReadLastConfirmed(LedgerDescriptor ledgerDescriptor) {
        RefCountedLedgerHandle refCountedLedgerHandle = this.handlesMap.get(ledgerDescriptor);
        if (null == refCountedLedgerHandle) {
            LOG.error("Accessing ledger {} without opening.", ledgerDescriptor);
            return Future.exception(BKException.create(-999));
        }
        final Promise promise = new Promise();
        refCountedLedgerHandle.handle.asyncTryReadLastConfirmed(new AsyncCallback.ReadLastConfirmedCallback() { // from class: com.twitter.distributedlog.LedgerHandleCache.4
            public void readLastConfirmedComplete(int i, long j, Object obj) {
                if (0 == i) {
                    promise.setValue(Long.valueOf(j));
                } else {
                    promise.setException(BKException.create(i));
                }
            }
        }, (Object) null);
        return promise;
    }

    public long tryReadLastConfirmed(LedgerDescriptor ledgerDescriptor) throws BKException {
        return ((Long) FutureUtils.bkResult(asyncTryReadLastConfirmed(ledgerDescriptor))).longValue();
    }

    public Future<Pair<Long, LedgerEntry>> asyncReadLastConfirmedAndEntry(LedgerDescriptor ledgerDescriptor, long j, long j2, boolean z) {
        RefCountedLedgerHandle refCountedLedgerHandle = this.handlesMap.get(ledgerDescriptor);
        if (null == refCountedLedgerHandle) {
            LOG.error("Accessing ledger {} without opening.", ledgerDescriptor);
            return Future.exception(BKException.create(-999));
        }
        final Promise promise = new Promise();
        refCountedLedgerHandle.handle.asyncReadLastConfirmedAndEntry(j, j2, z, new AsyncCallback.ReadLastConfirmedAndEntryCallback() { // from class: com.twitter.distributedlog.LedgerHandleCache.5
            public void readLastConfirmedAndEntryComplete(int i, long j3, LedgerEntry ledgerEntry, Object obj) {
                if (0 == i) {
                    promise.setValue(Pair.of(Long.valueOf(j3), ledgerEntry));
                } else {
                    promise.setException(BKException.create(i));
                }
            }
        }, (Object) null);
        return promise;
    }

    public Future<Enumeration<LedgerEntry>> asyncReadEntries(LedgerDescriptor ledgerDescriptor, long j, long j2) {
        RefCountedLedgerHandle refCountedLedgerHandle = this.handlesMap.get(ledgerDescriptor);
        if (null == refCountedLedgerHandle) {
            LOG.error("Accessing ledger {} without opening.", ledgerDescriptor);
            return Future.exception(BKException.create(-999));
        }
        final Promise promise = new Promise();
        refCountedLedgerHandle.handle.asyncReadEntries(j, j2, new AsyncCallback.ReadCallback() { // from class: com.twitter.distributedlog.LedgerHandleCache.6
            public void readComplete(int i, LedgerHandle ledgerHandle, Enumeration<LedgerEntry> enumeration, Object obj) {
                if (0 == i) {
                    promise.setValue(enumeration);
                } else {
                    promise.setException(BKException.create(i));
                }
            }
        }, (Object) null);
        return promise;
    }

    public Enumeration<LedgerEntry> readEntries(LedgerDescriptor ledgerDescriptor, long j, long j2) throws BKException {
        return (Enumeration) FutureUtils.bkResult(asyncReadEntries(ledgerDescriptor, j, j2));
    }

    public long getLength(LedgerDescriptor ledgerDescriptor) throws BKException {
        RefCountedLedgerHandle ledgerHandle = getLedgerHandle(ledgerDescriptor);
        if (null != ledgerHandle) {
            return ledgerHandle.handle.getLength();
        }
        LOG.error("Accessing ledger {} without opening.", ledgerDescriptor);
        throw BKException.create(-999);
    }

    public void clear() {
        if (null != this.handlesMap) {
            Iterator<Map.Entry<LedgerDescriptor, RefCountedLedgerHandle>> it = this.handlesMap.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<LedgerDescriptor, RefCountedLedgerHandle> next = it.next();
                it.remove();
                next.getValue().forceClose();
            }
        }
    }
}
