package com.twitter.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Ticker;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.DLIllegalStateException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LockCancelledException;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.lock.DistributedLock;
import com.twitter.distributedlog.lock.SessionLockFactory;
import com.twitter.distributedlog.lock.ZKSessionLockFactory;
import com.twitter.distributedlog.logsegment.LogSegmentFilter;
import com.twitter.distributedlog.logsegment.LogSegmentMetadataStore;
import com.twitter.distributedlog.readahead.ReadAheadWorker;
import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.distributedlog.util.Utils;
import com.twitter.util.ExceptionalFunction;
import com.twitter.util.ExceptionalFunction0;
import com.twitter.util.Function;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import com.twitter.util.Return;
import com.twitter.util.Throw;
import com.twitter.util.Try;
import java.io.IOException;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.SafeRunnable;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/twitter/distributedlog/BKLogReadHandler.class */
public class BKLogReadHandler extends BKLogHandler {
    static final Logger LOG = LoggerFactory.getLogger(BKLogReadHandler.class);
    private static final int LAYOUT_VERSION = -1;
    protected final ZKLogMetadataForReader logMetadataForReader;
    protected final ReadAheadCache readAheadCache;
    protected final LedgerHandleCache handleCache;
    protected final OrderedScheduler readAheadExecutor;
    protected final DynamicDistributedLogConfiguration dynConf;
    protected ReadAheadWorker readAheadWorker;
    private final boolean isHandleForReading;
    private final SessionLockFactory lockFactory;
    private final OrderedScheduler lockStateExecutor;
    private final Optional<String> subscriberId;
    private final String readLockPath;
    private DistributedLock readLock;
    private Future<Void> lockAcquireFuture;
    private final AlertStatsLogger alertStatsLogger;
    private final StatsLogger handlerStatsLogger;
    private final StatsLogger perLogStatsLogger;
    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;

    public BKLogReadHandler(ZKLogMetadataForReader zKLogMetadataForReader, Optional<String> optional, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, ZooKeeperClientBuilder zooKeeperClientBuilder, BookKeeperClientBuilder bookKeeperClientBuilder, LogSegmentMetadataStore logSegmentMetadataStore, OrderedScheduler orderedScheduler, OrderedScheduler orderedScheduler2, OrderedScheduler orderedScheduler3, AlertStatsLogger alertStatsLogger, ReadAheadExceptionsLogger readAheadExceptionsLogger, StatsLogger statsLogger, StatsLogger statsLogger2, String str, AsyncNotification asyncNotification, boolean z, boolean z2) {
        super(zKLogMetadataForReader, distributedLogConfiguration, zooKeeperClientBuilder, bookKeeperClientBuilder, logSegmentMetadataStore, orderedScheduler, statsLogger, alertStatsLogger, asyncNotification, LogSegmentFilter.DEFAULT_FILTER, str);
        this.readAheadWorker = null;
        this.logMetadataForReader = zKLogMetadataForReader;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.readAheadExecutor = orderedScheduler3;
        this.alertStatsLogger = alertStatsLogger;
        this.perLogStatsLogger = z ? statsLogger2 : NullStatsLogger.INSTANCE;
        this.handlerStatsLogger = BroadCastStatsLogger.masterslave(this.perLogStatsLogger, statsLogger);
        this.readAheadExceptionsLogger = readAheadExceptionsLogger;
        this.handleCache = LedgerHandleCache.newBuilder().bkc(this.bookKeeperClient).conf(distributedLogConfiguration).statsLogger(statsLogger).build();
        this.readAheadCache = new ReadAheadCache(getFullyQualifiedName(), this.handlerStatsLogger, alertStatsLogger, asyncNotification, dynamicDistributedLogConfiguration.getReadAheadMaxRecords(), z2, distributedLogConfiguration.getTraceReadAheadDeliveryLatency(), distributedLogConfiguration.getDataLatencyWarnThresholdMillis(), Ticker.systemTicker());
        this.subscriberId = optional;
        this.readLockPath = zKLogMetadataForReader.getReadLockPath(optional);
        this.lockStateExecutor = orderedScheduler2;
        this.lockFactory = new ZKSessionLockFactory(this.zooKeeperClient, getLockClientId(), orderedScheduler2, distributedLogConfiguration.getZKNumRetries(), distributedLogConfiguration.getLockTimeoutMilliSeconds(), distributedLogConfiguration.getZKRetryBackoffStartMillis(), statsLogger.scope("read_lock"));
        this.isHandleForReading = z;
    }

    @VisibleForTesting
    String getReadLockPath() {
        return this.readLockPath;
    }

    <T> void satisfyPromiseAsync(final Promise<T> promise, final Try<T> r9) {
        this.scheduler.submit((Runnable) new SafeRunnable() { // from class: com.twitter.distributedlog.BKLogReadHandler.1
            public void safeRun() {
                promise.update(r9);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public synchronized Future<Void> lockStream() {
        if (null == this.lockAcquireFuture) {
            final ExceptionalFunction0<DistributedLock> exceptionalFunction0 = new ExceptionalFunction0<DistributedLock>() { // from class: com.twitter.distributedlog.BKLogReadHandler.2
                /* renamed from: applyE, reason: merged with bridge method [inline-methods] */
                public DistributedLock m19applyE() throws IOException {
                    BKLogReadHandler.this.readLock = new DistributedLock(BKLogReadHandler.this.lockStateExecutor, BKLogReadHandler.this.lockFactory, BKLogReadHandler.this.readLockPath, BKLogReadHandler.this.conf.getLockTimeoutMilliSeconds(), BKLogReadHandler.this.statsLogger.scope("read_lock"));
                    BKLogReadHandler.LOG.info("acquiring readlock {} at {}", BKLogReadHandler.this.getLockClientId(), BKLogReadHandler.this.readLockPath);
                    return BKLogReadHandler.this.readLock;
                }
            };
            this.lockAcquireFuture = ensureReadLockPathExist().flatMap(new ExceptionalFunction<Void, Future<Void>>() { // from class: com.twitter.distributedlog.BKLogReadHandler.3
                public Future<Void> applyE(Void r6) throws Throwable {
                    return BKLogReadHandler.this.scheduler.apply(exceptionalFunction0).flatMap(new ExceptionalFunction<DistributedLock, Future<Void>>() { // from class: com.twitter.distributedlog.BKLogReadHandler.3.1
                        public Future<Void> applyE(DistributedLock distributedLock) throws IOException {
                            return BKLogReadHandler.this.acquireLockOnExecutorThread(distributedLock);
                        }
                    });
                }
            });
        }
        return this.lockAcquireFuture;
    }

    Future<Void> acquireLockOnExecutorThread(DistributedLock distributedLock) throws LockingException {
        final Future<DistributedLock> asyncAcquire = distributedLock.asyncAcquire();
        final Promise promise = new Promise();
        promise.setInterruptHandler(new Function<Throwable, BoxedUnit>() { // from class: com.twitter.distributedlog.BKLogReadHandler.4
            public BoxedUnit apply(Throwable th) {
                FutureUtils.cancel(asyncAcquire);
                return null;
            }
        });
        asyncAcquire.addEventListener(new FutureEventListener<DistributedLock>() { // from class: com.twitter.distributedlog.BKLogReadHandler.5
            public void onSuccess(DistributedLock distributedLock2) {
                BKLogReadHandler.LOG.info("acquired readlock {} at {}", BKLogReadHandler.this.getLockClientId(), BKLogReadHandler.this.readLockPath);
                BKLogReadHandler.this.satisfyPromiseAsync(promise, new Return((Object) null));
            }

            public void onFailure(Throwable th) {
                BKLogReadHandler.LOG.info("failed to acquire readlock {} at {}", new Object[]{BKLogReadHandler.this.getLockClientId(), BKLogReadHandler.this.readLockPath, th});
                BKLogReadHandler.this.satisfyPromiseAsync(promise, new Throw(th));
            }
        });
        return promise;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void checkReadLock() throws DLIllegalStateException, LockingException {
        synchronized (this) {
            if (null == this.lockAcquireFuture || !this.lockAcquireFuture.isDefined()) {
                throw new DLIllegalStateException("Attempt to check for lock before it has been acquired successfully");
            }
        }
        this.readLock.checkOwnership();
    }

    @Override // com.twitter.distributedlog.BKLogHandler, com.twitter.distributedlog.io.AsyncCloseable
    public Future<Void> asyncClose() {
        DistributedLock distributedLock;
        synchronized (this) {
            if (null != this.lockAcquireFuture && !this.lockAcquireFuture.isDefined()) {
                FutureUtils.cancel(this.lockAcquireFuture);
            }
            distributedLock = this.readLock;
        }
        return Utils.closeSequence(this.scheduler, this.readAheadWorker, distributedLock).flatMap(new AbstractFunction1<Void, Future<Void>>() { // from class: com.twitter.distributedlog.BKLogReadHandler.6
            public Future<Void> apply(Void r4) {
                if (null != BKLogReadHandler.this.readAheadCache) {
                    BKLogReadHandler.this.readAheadCache.clear();
                }
                if (null != BKLogReadHandler.this.handleCache) {
                    BKLogReadHandler.this.handleCache.clear();
                }
                return BKLogReadHandler.super.asyncClose();
            }
        });
    }

    @Override // com.twitter.distributedlog.BKLogHandler, com.twitter.distributedlog.io.AsyncAbortable
    public Future<Void> asyncAbort() {
        return asyncClose();
    }

    public void startReadAhead(LedgerReadPosition ledgerReadPosition, AsyncFailureInjector asyncFailureInjector) {
        if (null == this.readAheadWorker) {
            this.readAheadWorker = new ReadAheadWorker(this.conf, this.dynConf, this.logMetadataForReader, this, this.zooKeeperClient, this.readAheadExecutor, this.handleCache, ledgerReadPosition, this.readAheadCache, this.isHandleForReading, this.readAheadExceptionsLogger, this.handlerStatsLogger, this.perLogStatsLogger, this.alertStatsLogger, asyncFailureInjector, this.notification);
            this.readAheadWorker.start();
        }
    }

    public boolean isReadAheadCaughtUp() {
        return null != this.readAheadWorker && this.readAheadWorker.isCaughtUp();
    }

    public LedgerHandleCache getHandleCache() {
        return this.handleCache;
    }

    private Future<Void> ensureReadLockPathExist() {
        final Promise promise = new Promise();
        promise.setInterruptHandler(new Function<Throwable, BoxedUnit>() { // from class: com.twitter.distributedlog.BKLogReadHandler.7
            public BoxedUnit apply(Throwable th) {
                FutureUtils.setException(promise, new LockCancelledException(BKLogReadHandler.this.readLockPath, "Could not ensure read lock path", th));
                return null;
            }
        });
        Utils.zkAsyncCreateFullPathOptimisticRecursive(this.zooKeeperClient, this.readLockPath, Optional.of(this.logMetadata.getLogRootPath()), new byte[0], this.zooKeeperClient.getDefaultACL(), CreateMode.PERSISTENT, new AsyncCallback.StringCallback() { // from class: com.twitter.distributedlog.BKLogReadHandler.8
            public void processResult(final int i, final String str, Object obj, String str2) {
                BKLogReadHandler.this.scheduler.submit(new Runnable() { // from class: com.twitter.distributedlog.BKLogReadHandler.8.1
                    @Override // java.lang.Runnable
                    public void run() {
                        if (KeeperException.Code.NONODE.intValue() == i) {
                            FutureUtils.setException(promise, new LogNotFoundException(String.format("Log %s does not exist or has been deleted", BKLogReadHandler.this.getFullyQualifiedName())));
                            return;
                        }
                        if (KeeperException.Code.OK.intValue() == i) {
                            FutureUtils.setValue(promise, null);
                            BKLogReadHandler.LOG.trace("Created path {}.", str);
                            return;
                        }
                        if (KeeperException.Code.NODEEXISTS.intValue() == i) {
                            FutureUtils.setValue(promise, null);
                            BKLogReadHandler.LOG.trace("Path {} is already existed.", str);
                        } else if (-2147483646 == i) {
                            FutureUtils.setException(promise, new ZooKeeperClient.ZooKeeperConnectionException(str));
                        } else if (-2147483647 == i) {
                            FutureUtils.setException(promise, new DLInterruptedException(str));
                        } else {
                            FutureUtils.setException(promise, KeeperException.create(KeeperException.Code.get(i)));
                        }
                    }
                });
            }
        }, null);
        return promise;
    }

    public LogRecordWithDLSN getNextReadAheadRecord() throws IOException {
        return this.readAheadCache.getNextReadAheadRecord();
    }

    public ReadAheadCache getReadAheadCache() {
        return this.readAheadCache;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public void disableReadAheadZKNotification() {
        if (null != this.readAheadWorker) {
            this.readAheadWorker.disableZKNotification();
        }
    }
}
