package com.twitter.distributedlog.readahead;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.AsyncNotification;
import com.twitter.distributedlog.BKLogHandler;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.DistributedLogConfiguration;
import com.twitter.distributedlog.LedgerDescriptor;
import com.twitter.distributedlog.LedgerHandleCache;
import com.twitter.distributedlog.LedgerReadPosition;
import com.twitter.distributedlog.LogSegmentMetadata;
import com.twitter.distributedlog.ReadAheadCache;
import com.twitter.distributedlog.ZooKeeperClient;
import com.twitter.distributedlog.callback.ReadAheadCallback;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.AlreadyTruncatedTransactionException;
import com.twitter.distributedlog.exceptions.DLInterruptedException;
import com.twitter.distributedlog.exceptions.LogNotFoundException;
import com.twitter.distributedlog.exceptions.LogReadException;
import com.twitter.distributedlog.impl.metadata.ZKLogMetadataForReader;
import com.twitter.distributedlog.injector.AsyncFailureInjector;
import com.twitter.distributedlog.io.AsyncCloseable;
import com.twitter.distributedlog.stats.ReadAheadExceptionsLogger;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.distributedlog.util.OrderedScheduler;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.AlertStatsLogger;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker.class */
public class ReadAheadWorker implements ReadAheadCallback, Runnable, Watcher, AsyncCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ReadAheadWorker.class);
    private static final int BKC_ZK_EXCEPTION_THRESHOLD_IN_SECONDS = 30;
    private static final int BKC_UNEXPECTED_EXCEPTION_THRESHOLD = 3;
    private final String fullyQualifiedName;
    private final DistributedLogConfiguration conf;
    private final DynamicDistributedLogConfiguration dynConf;
    private final ZKLogMetadataForReader logMetadata;
    private final BKLogHandler bkLedgerManager;
    private final boolean isHandleForReading;
    protected final AsyncNotification notification;
    private final ZooKeeperClient zkc;
    protected final OrderedScheduler scheduler;
    private final LedgerHandleCache handleCache;
    private final ReadAheadCache readAheadCache;
    private final int noLedgerExceptionOnReadLACThreshold;
    private final LedgerReadPosition startReadPosition;
    protected LedgerReadPosition nextReadAheadPosition;
    private final Watcher getLedgersWatcher;
    private int currentMetadataIndex;
    protected LedgerDescriptor currentLH;
    private volatile List<LogSegmentMetadata> ledgerList;
    private final AsyncFailureInjector failureInjector;
    protected final long metadataLatencyWarnThresholdMillis;
    final ReadAheadTracker tracker;
    private final boolean readAheadSkipBrokenEntries;
    private final AlertStatsLogger alertStatsLogger;
    private final StatsLogger readAheadPerStreamStatsLogger;
    private final Counter readAheadWorkerWaits;
    private final Counter readAheadEntryPiggyBackHits;
    private final Counter readAheadEntryPiggyBackMisses;
    private final Counter readAheadReadLACAndEntryCounter;
    private final Counter readAheadCacheFullCounter;
    private final Counter readAheadSkippedBrokenEntries;
    private final Counter idleReaderWarn;
    private final OpStatsLogger readAheadReadEntriesStat;
    private final OpStatsLogger readAheadCacheResumeStat;
    private final OpStatsLogger readAheadLacLagStats;
    private final OpStatsLogger longPollInterruptionStat;
    private final OpStatsLogger metadataReinitializationStat;
    private final OpStatsLogger notificationExecutionStat;
    private final ReadAheadExceptionsLogger readAheadExceptionsLogger;
    volatile boolean running = true;
    Promise<Void> stopPromise = null;
    private volatile boolean isCatchingUp = true;
    private volatile boolean logDeleted = false;
    private volatile boolean readAheadError = false;
    private volatile boolean readAheadInterrupted = false;
    private volatile boolean readingFromTruncated = false;
    volatile boolean encounteredException = false;
    private final AtomicInteger bkcZkExceptions = new AtomicInteger(0);
    private final AtomicInteger bkcUnExpectedExceptions = new AtomicInteger(0);
    private final AtomicInteger bkcNoLedgerExceptionsOnReadLAC = new AtomicInteger(0);
    volatile boolean zkNotificationDisabled = false;
    final Object notificationLock = new Object();
    AsyncNotification metadataNotification = null;
    volatile long metadataNotificationTimeMillis = -1;
    private volatile boolean reInitializeMetadata = true;
    volatile boolean inProgressChanged = false;
    private LogSegmentMetadata currentMetadata = null;
    final Phase schedulePhase = new ScheduleReadAheadPhase();
    final Phase exceptionHandler = new ExceptionHandlePhase(this.schedulePhase);
    final Phase readAheadPhase = new StoppablePhase(new CheckInProgressChangedPhase(new OpenLedgerPhase(new ReadEntriesPhase(this.schedulePhase))));
    final Stopwatch lastLedgerCloseDetected = Stopwatch.createUnstarted();
    final Stopwatch resumeStopWatch = Stopwatch.createUnstarted();

    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$CheckInProgressChangedPhase.class */
    final class CheckInProgressChangedPhase extends Phase implements BookkeeperInternalCallbacks.GenericCallback<List<LogSegmentMetadata>> {
        CheckInProgressChangedPhase(Phase phase) {
            super(phase);
        }

        public void operationComplete(final int i, final List<LogSegmentMetadata> list) {
            ReadAheadWorker.this.submit(new Runnable() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.CheckInProgressChangedPhase.1
                @Override // java.lang.Runnable
                public void run() {
                    if (KeeperException.Code.OK.intValue() != i) {
                        if (KeeperException.Code.NONODE.intValue() == i) {
                            ReadAheadWorker.LOG.info("Log {} has been deleted. Set ReadAhead to error to stop reading.", ReadAheadWorker.this.logMetadata.getFullyQualifiedName());
                            ReadAheadWorker.this.logDeleted = true;
                            ReadAheadWorker.this.setReadAheadError(ReadAheadWorker.this.tracker);
                            return;
                        } else {
                            ReadAheadWorker.LOG.info("ZK Exception {} while reading ledger list", Integer.valueOf(i));
                            ReadAheadWorker.this.reInitializeMetadata = true;
                            if (-2147483647 == i) {
                                ReadAheadWorker.this.handleException(ReadAheadPhase.GET_LEDGERS, -15);
                                return;
                            } else {
                                ReadAheadWorker.this.handleException(ReadAheadPhase.GET_LEDGERS, -9);
                                return;
                            }
                        }
                    }
                    ReadAheadWorker.this.ledgerList = list;
                    boolean definitelyLessThanOrEqualTo = ReadAheadWorker.this.nextReadAheadPosition.definitelyLessThanOrEqualTo(ReadAheadWorker.this.startReadPosition);
                    int i2 = 0;
                    while (true) {
                        if (i2 >= ReadAheadWorker.this.ledgerList.size()) {
                            break;
                        }
                        LogSegmentMetadata logSegmentMetadata = (LogSegmentMetadata) ReadAheadWorker.this.ledgerList.get(i2);
                        if (!logSegmentMetadata.isTruncated() || !definitelyLessThanOrEqualTo || ReadAheadWorker.this.conf.getIgnoreTruncationStatus()) {
                            DLSN dlsn = new DLSN(ReadAheadWorker.this.nextReadAheadPosition.getLogSegmentSequenceNumber(), ReadAheadWorker.this.nextReadAheadPosition.getEntryId(), -1L);
                            boolean z = (logSegmentMetadata.getLastDLSN().compareTo(dlsn) >= 0) || ((logSegmentMetadata.isInProgress() || (null != ReadAheadWorker.this.currentMetadata && ReadAheadWorker.this.currentMetadata.isInProgress())) && logSegmentMetadata.getLogSegmentSequenceNumber() == ReadAheadWorker.this.nextReadAheadPosition.getLogSegmentSequenceNumber());
                            if (logSegmentMetadata.isPartiallyTruncated() && !definitelyLessThanOrEqualTo && logSegmentMetadata.getMinActiveDLSN().compareTo(dlsn) > 0) {
                                if (ReadAheadWorker.this.conf.getAlertWhenPositioningOnTruncated()) {
                                    ReadAheadWorker.this.alertStatsLogger.raise("Trying to position reader on {} when {} is marked partially truncated", new Object[]{ReadAheadWorker.this.nextReadAheadPosition, logSegmentMetadata});
                                }
                                if (!ReadAheadWorker.this.conf.getIgnoreTruncationStatus()) {
                                    ReadAheadWorker.LOG.error("{}: Trying to position reader on {} when {} is marked partially truncated", new Object[]{ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), ReadAheadWorker.this.nextReadAheadPosition, logSegmentMetadata});
                                    ReadAheadWorker.this.setReadingFromTruncated(ReadAheadWorker.this.tracker);
                                    return;
                                }
                            }
                            if (ReadAheadWorker.LOG.isTraceEnabled()) {
                                ReadAheadWorker.LOG.trace("CheckLogSegment : newMetadata = {}, currentMetadata = {}, nextReadAheadPosition = {}", new Object[]{logSegmentMetadata, ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.nextReadAheadPosition});
                            }
                            if (z) {
                                long j = 0;
                                if (logSegmentMetadata.isPartiallyTruncated() && !ReadAheadWorker.this.conf.getIgnoreTruncationStatus()) {
                                    j = logSegmentMetadata.getMinActiveDLSN().getEntryId();
                                    ReadAheadWorker.this.readAheadCache.setMinActiveDLSN(logSegmentMetadata.getMinActiveDLSN());
                                }
                                if (logSegmentMetadata.getLogSegmentSequenceNumber() == ReadAheadWorker.this.nextReadAheadPosition.getLogSegmentSequenceNumber()) {
                                    j = Math.max(j, ReadAheadWorker.this.nextReadAheadPosition.getEntryId());
                                    if (ReadAheadWorker.this.currentMetadata != null) {
                                        ReadAheadWorker.this.inProgressChanged = ReadAheadWorker.this.currentMetadata.isInProgress() && !logSegmentMetadata.isInProgress();
                                    }
                                } else {
                                    ReadAheadWorker.LOG.trace("Positioning {} on a new ledger {}", ReadAheadWorker.this.fullyQualifiedName, logSegmentMetadata);
                                    if (!ReadAheadWorker.this.closeCurrentLedgerHandle()) {
                                        return;
                                    }
                                }
                                ReadAheadWorker.this.nextReadAheadPosition = new LedgerReadPosition(logSegmentMetadata.getLedgerId(), logSegmentMetadata.getLogSegmentSequenceNumber(), j);
                                if (ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges()) {
                                    ReadAheadWorker.LOG.info("Moved read position to {} for stream {} at {}.", new Object[]{ReadAheadWorker.this.nextReadAheadPosition, ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), Long.valueOf(System.currentTimeMillis())});
                                }
                                if (logSegmentMetadata.isTruncated()) {
                                    if (ReadAheadWorker.this.conf.getAlertWhenPositioningOnTruncated()) {
                                        ReadAheadWorker.this.alertStatsLogger.raise("Trying to position reader on {} when {} is marked truncated", new Object[]{ReadAheadWorker.this.nextReadAheadPosition, logSegmentMetadata});
                                    }
                                    if (!ReadAheadWorker.this.conf.getIgnoreTruncationStatus()) {
                                        ReadAheadWorker.LOG.error("{}: Trying to position reader on {} when {} is marked truncated", new Object[]{ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), ReadAheadWorker.this.nextReadAheadPosition, logSegmentMetadata});
                                        ReadAheadWorker.this.setReadingFromTruncated(ReadAheadWorker.this.tracker);
                                        return;
                                    }
                                }
                                ReadAheadWorker.this.currentMetadata = logSegmentMetadata;
                                ReadAheadWorker.this.currentMetadataIndex = i2;
                            } else if (logSegmentMetadata.isInProgress()) {
                                break;
                            }
                        }
                        i2++;
                    }
                    if (null == ReadAheadWorker.this.currentMetadata) {
                        if (ReadAheadWorker.this.isCatchingUp) {
                            ReadAheadWorker.this.isCatchingUp = false;
                            ReadAheadWorker.this.readAheadCache.setSuppressDeliveryLatency(false);
                            if (ReadAheadWorker.this.isHandleForReading) {
                                ReadAheadWorker.LOG.info("{} caught up at {}: position = {} and no log segment to position on at this point.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis()), ReadAheadWorker.this.nextReadAheadPosition});
                            }
                        }
                        ReadAheadWorker.this.schedule(ReadAheadWorker.this, ReadAheadWorker.this.conf.getReadAheadWaitTimeOnEndOfStream());
                        if (ReadAheadWorker.LOG.isDebugEnabled()) {
                            ReadAheadWorker.LOG.debug("No log segment to position on for {}. Backing off for {} millseconds", ReadAheadWorker.this.fullyQualifiedName, Integer.valueOf(ReadAheadWorker.this.conf.getReadAheadWaitTimeOnEndOfStream()));
                        }
                    } else {
                        if (ReadAheadWorker.LOG.isDebugEnabled()) {
                            ReadAheadWorker.LOG.debug("Initialized metadata for {}, starting reading ahead from {} : {}.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, Integer.valueOf(ReadAheadWorker.this.currentMetadataIndex), ReadAheadWorker.this.currentMetadata});
                        }
                        CheckInProgressChangedPhase.this.next.process(0);
                    }
                    ReadAheadWorker.this.bkLedgerManager.reportGetSegmentStats(true);
                }
            });
        }

        @Override // com.twitter.distributedlog.readahead.ReadAheadWorker.Phase
        void process(int i) {
            if (!ReadAheadWorker.this.running) {
                ReadAheadWorker.this.setReadAheadStopped();
                return;
            }
            ReadAheadWorker.this.tracker.enterPhase(ReadAheadPhase.GET_LEDGERS);
            ReadAheadWorker.this.inProgressChanged = false;
            if (ReadAheadWorker.LOG.isTraceEnabled()) {
                ReadAheadWorker.LOG.trace("Checking {} if InProgress changed.", ReadAheadWorker.this.fullyQualifiedName);
            }
            if (!ReadAheadWorker.this.reInitializeMetadata && null != ReadAheadWorker.this.currentMetadata) {
                this.next.process(0);
                return;
            }
            ReadAheadWorker.this.reInitializeMetadata = false;
            if (ReadAheadWorker.LOG.isTraceEnabled()) {
                ReadAheadWorker.LOG.trace("Reinitializing metadata for {}.", ReadAheadWorker.this.fullyQualifiedName);
            }
            if (ReadAheadWorker.this.metadataNotificationTimeMillis > 0) {
                long currentTimeMillis = System.currentTimeMillis();
                long j = currentTimeMillis - ReadAheadWorker.this.metadataNotificationTimeMillis;
                if (j >= ReadAheadWorker.this.metadataLatencyWarnThresholdMillis) {
                    ReadAheadWorker.LOG.warn("{} reinitialize metadata at {}, which is {} millis after receiving notification at {}.", new Object[]{ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), Long.valueOf(currentTimeMillis), Long.valueOf(j), Long.valueOf(ReadAheadWorker.this.metadataNotificationTimeMillis)});
                }
                ReadAheadWorker.this.metadataReinitializationStat.registerSuccessfulEvent(TimeUnit.MILLISECONDS.toMicros(j));
                ReadAheadWorker.this.metadataNotificationTimeMillis = -1L;
            }
            ReadAheadWorker.this.bkLedgerManager.asyncGetLedgerList(LogSegmentMetadata.COMPARATOR, ReadAheadWorker.this.getLedgersWatcher, this);
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$ExceptionHandlePhase.class */
    final class ExceptionHandlePhase extends Phase {
        ExceptionHandlePhase(Phase phase) {
            super(phase);
        }

        @Override // com.twitter.distributedlog.readahead.ReadAheadWorker.Phase
        void process(int i) {
            ReadAheadWorker.this.tracker.enterPhase(ReadAheadPhase.EXCEPTION_HANDLING);
            if (-15 == i) {
                ReadAheadWorker.LOG.trace("ReadAhead Worker for {} is interrupted.", ReadAheadWorker.this.fullyQualifiedName);
                ReadAheadWorker.this.running = false;
                ReadAheadWorker.this.setReadAheadInterrupted(ReadAheadWorker.this.tracker);
                return;
            }
            if (-9 == i) {
                ReadAheadWorker.this.encounteredException = true;
                ReadAheadWorker.LOG.debug("ReadAhead Worker for {} encountered zookeeper exception : total exceptions are {}.", ReadAheadWorker.this.fullyQualifiedName, Integer.valueOf(ReadAheadWorker.this.bkcZkExceptions.incrementAndGet()));
            } else if (0 != i) {
                ReadAheadWorker.this.encounteredException = true;
                switch (i) {
                    case -13:
                    case -10:
                    case -7:
                        break;
                    default:
                        ReadAheadWorker.this.bkcUnExpectedExceptions.incrementAndGet();
                        break;
                }
                ReadAheadWorker.LOG.info("ReadAhead Worker for {} encountered exception : {}", ReadAheadWorker.this.fullyQualifiedName, BKException.create(i));
            }
            this.next.process(0);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$InterruptibleScheduledRunnable.class */
    public class InterruptibleScheduledRunnable implements AsyncNotification, Runnable {
        final Runnable task;
        final AtomicBoolean called = new AtomicBoolean(false);
        final long startNanos = MathUtils.nowInNano();

        InterruptibleScheduledRunnable(Runnable runnable) {
            this.task = runnable;
        }

        @Override // com.twitter.distributedlog.AsyncNotification
        public void notifyOnError() {
            ReadAheadWorker.this.longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(this.startNanos));
            execute();
        }

        @Override // com.twitter.distributedlog.AsyncNotification
        public void notifyOnOperationComplete() {
            ReadAheadWorker.this.longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(this.startNanos));
            execute();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.called.compareAndSet(false, true)) {
                this.task.run();
            }
        }

        void execute() {
            if (this.called.compareAndSet(false, true)) {
                ReadAheadWorker.this.submit(this.task);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$LongPollNotification.class */
    public abstract class LongPollNotification<T> implements AsyncNotification {
        final long lac;
        final T cb;
        final Object ctx;
        final AtomicBoolean called = new AtomicBoolean(false);
        final long startNanos = MathUtils.nowInNano();

        LongPollNotification(long j, T t, Object obj) {
            this.lac = j;
            this.cb = t;
            this.ctx = obj;
        }

        void complete(boolean z) {
            long nowInNano = MathUtils.nowInNano();
            doComplete(z);
            if (z) {
                ReadAheadWorker.this.notificationExecutionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(nowInNano));
            } else {
                ReadAheadWorker.this.notificationExecutionStat.registerFailedEvent(MathUtils.elapsedMicroSec(nowInNano));
            }
        }

        abstract void doComplete(boolean z);

        @Override // com.twitter.distributedlog.AsyncNotification
        public void notifyOnError() {
            ReadAheadWorker.this.longPollInterruptionStat.registerFailedEvent(MathUtils.elapsedMicroSec(this.startNanos));
            complete(false);
        }

        @Override // com.twitter.distributedlog.AsyncNotification
        public void notifyOnOperationComplete() {
            ReadAheadWorker.this.longPollInterruptionStat.registerSuccessfulEvent(MathUtils.elapsedMicroSec(this.startNanos));
            complete(true);
        }

        void callbackImmediately(boolean z) {
            if (z) {
                complete(true);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$OpenLedgerPhase.class */
    public final class OpenLedgerPhase extends Phase implements BookkeeperInternalCallbacks.GenericCallback<LedgerDescriptor>, AsyncCallback.ReadLastConfirmedAndEntryCallback {
        OpenLedgerPhase(Phase phase) {
            super(phase);
        }

        private void issueReadLastConfirmedAndEntry(boolean z, final long j) {
            Object[] objArr = new Object[2];
            objArr[0] = z ? "Parallel" : "Sequential";
            objArr[1] = Long.valueOf(j);
            final String format = String.format("ReadLastConfirmedAndEntry(%s, %d)", objArr);
            final ReadLastConfirmedAndEntryCallbackWithNotification readLastConfirmedAndEntryCallbackWithNotification = new ReadLastConfirmedAndEntryCallbackWithNotification(j, this, format);
            boolean metadataNotification = ReadAheadWorker.this.setMetadataNotification(readLastConfirmedAndEntryCallbackWithNotification);
            ReadAheadWorker.this.handleCache.asyncReadLastConfirmedAndEntry(ReadAheadWorker.this.currentLH, ReadAheadWorker.this.nextReadAheadPosition.getEntryId(), ReadAheadWorker.this.conf.getReadLACLongPollTimeout(), z).addEventListener(new FutureEventListener<Pair<Long, LedgerEntry>>() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.OpenLedgerPhase.1
                public void onSuccess(Pair<Long, LedgerEntry> pair) {
                    readLastConfirmedAndEntryCallbackWithNotification.readLastConfirmedAndEntryComplete(0, ((Long) pair.getLeft()).longValue(), (LedgerEntry) pair.getRight(), format);
                }

                public void onFailure(Throwable th) {
                    readLastConfirmedAndEntryCallbackWithNotification.readLastConfirmedAndEntryComplete(FutureUtils.bkResultCode(th), j, null, format);
                }
            });
            readLastConfirmedAndEntryCallbackWithNotification.callbackImmediately(metadataNotification);
            ReadAheadWorker.this.readAheadReadLACAndEntryCounter.inc();
        }

        @Override // com.twitter.distributedlog.readahead.ReadAheadWorker.Phase
        void process(int i) {
            if (!ReadAheadWorker.this.running) {
                ReadAheadWorker.this.setReadAheadStopped();
                return;
            }
            ReadAheadWorker.this.tracker.enterPhase(ReadAheadPhase.OPEN_LEDGER);
            if (ReadAheadWorker.this.currentMetadata.isInProgress()) {
                if (null == ReadAheadWorker.this.currentLH) {
                    if (ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges()) {
                        ReadAheadWorker.LOG.info("Opening inprogress ledger of {} for {} at {}.", new Object[]{ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis())});
                    }
                    ReadAheadWorker.this.handleCache.asyncOpenLedger(ReadAheadWorker.this.currentMetadata, false).addEventListener(new FutureEventListener<LedgerDescriptor>() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.OpenLedgerPhase.2
                        public void onSuccess(LedgerDescriptor ledgerDescriptor) {
                            OpenLedgerPhase.this.operationComplete(0, ledgerDescriptor);
                        }

                        public void onFailure(Throwable th) {
                            OpenLedgerPhase.this.operationComplete(FutureUtils.bkResultCode(th), (LedgerDescriptor) null);
                        }
                    });
                    return;
                }
                try {
                    boolean isLedgerHandleClosed = ReadAheadWorker.this.handleCache.isLedgerHandleClosed(ReadAheadWorker.this.currentLH);
                    long lastAddConfirmed = ReadAheadWorker.this.handleCache.getLastAddConfirmed(ReadAheadWorker.this.currentLH);
                    if (lastAddConfirmed >= ReadAheadWorker.this.nextReadAheadPosition.getEntryId()) {
                        this.next.process(0);
                        return;
                    }
                    if (!isLedgerHandleClosed) {
                        ReadAheadWorker.this.lastLedgerCloseDetected.reset();
                    } else if (!ReadAheadWorker.this.lastLedgerCloseDetected.isRunning()) {
                        ReadAheadWorker.this.lastLedgerCloseDetected.reset().start();
                        if (ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges()) {
                            ReadAheadWorker.LOG.info("{} Ledger {} for inprogress segment {} closed", new Object[]{ReadAheadWorker.this.fullyQualifiedName, ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.currentLH});
                        }
                    } else if (ReadAheadWorker.this.lastLedgerCloseDetected.elapsed(TimeUnit.MILLISECONDS) > ReadAheadWorker.this.conf.getReaderIdleWarnThresholdMillis()) {
                        ReadAheadWorker.this.idleReaderWarn.inc();
                        ReadAheadWorker.LOG.info("{} Ledger {} for inprogress segment {} closed for idle reader warn threshold", new Object[]{ReadAheadWorker.this.fullyQualifiedName, ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.currentLH});
                        ReadAheadWorker.this.reInitializeMetadata = true;
                    }
                    ReadAheadWorker.this.tracker.enterPhase(ReadAheadPhase.READ_LAST_CONFIRMED);
                    if (ReadAheadWorker.this.isCatchingUp) {
                        ReadAheadWorker.this.isCatchingUp = false;
                        ReadAheadWorker.this.readAheadCache.setSuppressDeliveryLatency(false);
                        if (ReadAheadWorker.this.isHandleForReading) {
                            ReadAheadWorker.LOG.info("{} caught up at {}: lac = {}, position = {}.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis()), Long.valueOf(lastAddConfirmed), ReadAheadWorker.this.nextReadAheadPosition});
                        }
                    }
                    ReadAheadWorker.LOG.trace("Reading last add confirmed of {} for {}, as read poistion has moved over {} : {}", new Object[]{ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(lastAddConfirmed), ReadAheadWorker.this.nextReadAheadPosition});
                    if (ReadAheadWorker.this.nextReadAheadPosition.getEntryId() == 0 && ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges()) {
                        ReadAheadWorker.LOG.info("Reading last add confirmed for {} at {}: lac = {}, position = {}.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis()), Long.valueOf(lastAddConfirmed), ReadAheadWorker.this.nextReadAheadPosition});
                    } else {
                        ReadAheadWorker.LOG.trace("Reading last add confirmed for {} at {}: lac = {}, position = {}.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis()), Long.valueOf(lastAddConfirmed), ReadAheadWorker.this.nextReadAheadPosition});
                    }
                    issueReadLastConfirmedAndEntry(false, lastAddConfirmed);
                    return;
                } catch (BKException e) {
                    ReadAheadWorker.this.handleException(ReadAheadPhase.OPEN_LEDGER, e.getCode());
                    return;
                }
            }
            ReadAheadWorker.this.lastLedgerCloseDetected.reset();
            if (null == ReadAheadWorker.this.currentLH) {
                ReadAheadWorker.LOG.trace("Opening completed ledger of {} for {}.", ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName);
                ReadAheadWorker.this.handleCache.asyncOpenLedger(ReadAheadWorker.this.currentMetadata, true).addEventListener(new FutureEventListener<LedgerDescriptor>() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.OpenLedgerPhase.3
                    public void onSuccess(LedgerDescriptor ledgerDescriptor) {
                        OpenLedgerPhase.this.operationComplete(0, ledgerDescriptor);
                    }

                    public void onFailure(Throwable th) {
                        OpenLedgerPhase.this.operationComplete(FutureUtils.bkResultCode(th), (LedgerDescriptor) null);
                    }
                });
                return;
            }
            try {
                if (ReadAheadWorker.this.inProgressChanged) {
                    ReadAheadWorker.LOG.trace("Closing completed ledger of {} for {}.", ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName);
                    if (!ReadAheadWorker.this.closeCurrentLedgerHandle()) {
                        return;
                    } else {
                        ReadAheadWorker.this.inProgressChanged = false;
                    }
                } else {
                    long lastAddConfirmed2 = ReadAheadWorker.this.handleCache.getLastAddConfirmed(ReadAheadWorker.this.currentLH);
                    if (ReadAheadWorker.this.nextReadAheadPosition.getEntryId() > lastAddConfirmed2) {
                        boolean z = false;
                        if (lastAddConfirmed2 < ReadAheadWorker.this.currentMetadata.getLastEntryId()) {
                            ReadAheadWorker.this.alertStatsLogger.raise("Unexpected last entry id during read ahead; {} , {}", new Object[]{ReadAheadWorker.this.currentMetadata, Long.valueOf(lastAddConfirmed2)});
                            z = true;
                        }
                        if (ReadAheadWorker.this.conf.getPositionGapDetectionEnabled() && z) {
                            ReadAheadWorker.this.setReadAheadError(ReadAheadWorker.this.tracker);
                        } else {
                            if (ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges() && lastAddConfirmed2 > ReadAheadWorker.this.currentMetadata.getLastEntryId() + 1) {
                                ReadAheadWorker.LOG.warn("Potential Metadata Corruption {} for stream {}, lastAddConfirmed {}", new Object[]{ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), Long.valueOf(lastAddConfirmed2)});
                            }
                            ReadAheadWorker.LOG.trace("Past the last Add Confirmed {} in ledger {} for {}", new Object[]{Long.valueOf(lastAddConfirmed2), ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName});
                            if (!ReadAheadWorker.this.closeCurrentLedgerHandle()) {
                                return;
                            }
                            LogSegmentMetadata logSegmentMetadata = ReadAheadWorker.this.currentMetadata;
                            ReadAheadWorker.this.currentMetadata = null;
                            if (ReadAheadWorker.this.currentMetadataIndex + 1 < ReadAheadWorker.this.ledgerList.size()) {
                                ReadAheadWorker.this.currentMetadata = (LogSegmentMetadata) ReadAheadWorker.this.ledgerList.get(ReadAheadWorker.access$1804(ReadAheadWorker.this));
                                if (ReadAheadWorker.this.currentMetadata.getLogSegmentSequenceNumber() != logSegmentMetadata.getLogSegmentSequenceNumber() + 1) {
                                    ReadAheadWorker.this.alertStatsLogger.raise("Unexpected condition during read ahead; {} , {}", new Object[]{ReadAheadWorker.this.currentMetadata, logSegmentMetadata});
                                    ReadAheadWorker.this.setReadAheadError(ReadAheadWorker.this.tracker);
                                } else if (ReadAheadWorker.this.currentMetadata.isTruncated()) {
                                    if (ReadAheadWorker.this.conf.getAlertWhenPositioningOnTruncated()) {
                                        ReadAheadWorker.this.alertStatsLogger.raise("Trying to position reader on the log segment that is marked truncated : {}", new Object[]{ReadAheadWorker.this.currentMetadata});
                                    }
                                    if (!ReadAheadWorker.this.conf.getIgnoreTruncationStatus()) {
                                        ReadAheadWorker.LOG.error("{}: Trying to position reader on the log segment that is marked truncated : {}", ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), ReadAheadWorker.this.currentMetadata);
                                        ReadAheadWorker.this.setReadingFromTruncated(ReadAheadWorker.this.tracker);
                                    }
                                } else {
                                    if (ReadAheadWorker.LOG.isTraceEnabled()) {
                                        ReadAheadWorker.LOG.trace("Moving read position to a new ledger {} for {}.", ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName);
                                    }
                                    ReadAheadWorker.this.nextReadAheadPosition.positionOnNewLogSegment(ReadAheadWorker.this.currentMetadata.getLedgerId(), ReadAheadWorker.this.currentMetadata.getLogSegmentSequenceNumber());
                                }
                            }
                        }
                    }
                }
                if (!ReadAheadWorker.this.readAheadError) {
                    this.next.process(0);
                }
            } catch (BKException e2) {
                ReadAheadWorker.LOG.debug("Exception while repositioning", e2);
                ReadAheadWorker.this.handleException(ReadAheadPhase.CLOSE_LEDGER, e2.getCode());
            }
        }

        public void operationComplete(final int i, final LedgerDescriptor ledgerDescriptor) {
            ReadAheadWorker.this.submit(new Runnable() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.OpenLedgerPhase.4
                @Override // java.lang.Runnable
                public void run() {
                    if (0 != i) {
                        ReadAheadWorker.LOG.debug("BK Exception {} while opening ledger", Integer.valueOf(i));
                        ReadAheadWorker.this.handleException(ReadAheadPhase.OPEN_LEDGER, i);
                        return;
                    }
                    ReadAheadWorker.this.currentLH = ledgerDescriptor;
                    if (ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges()) {
                        ReadAheadWorker.LOG.info("Opened ledger of {} for {} at {}.", new Object[]{ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis())});
                    }
                    ReadAheadWorker.this.bkcZkExceptions.set(0);
                    ReadAheadWorker.this.bkcUnExpectedExceptions.set(0);
                    ReadAheadWorker.this.bkcNoLedgerExceptionsOnReadLAC.set(0);
                    OpenLedgerPhase.this.next.process(i);
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void handleReadLastConfirmedError(int i) {
            if (-7 != i) {
                if (0 != i) {
                    ReadAheadWorker.this.handleException(ReadAheadPhase.READ_LAST_CONFIRMED, i);
                }
            } else if (ReadAheadWorker.this.bkcNoLedgerExceptionsOnReadLAC.incrementAndGet() <= ReadAheadWorker.this.noLedgerExceptionOnReadLACThreshold) {
                if (ReadAheadWorker.LOG.isTraceEnabled()) {
                    ReadAheadWorker.LOG.info("{} No entries published to ledger {} yet. Backoff reading ahead for {} ms.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, ReadAheadWorker.this.currentLH, Integer.valueOf(ReadAheadWorker.this.conf.getReadAheadWaitTime())});
                }
                ReadAheadWorker.this.schedule(ReadAheadWorker.this, ReadAheadWorker.this.conf.getReadAheadWaitTime());
            } else {
                ReadAheadWorker.LOG.info("{} No entries published to ledger {} yet for {} millis.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, ReadAheadWorker.this.currentLH, Integer.valueOf(ReadAheadWorker.this.conf.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis())});
                ReadAheadWorker.this.bkcNoLedgerExceptionsOnReadLAC.set(0);
                if (ReadAheadWorker.this.closeCurrentLedgerHandle()) {
                    this.next.process(0);
                }
            }
        }

        public void readLastConfirmedAndEntryComplete(final int i, final long j, final LedgerEntry ledgerEntry, final Object obj) {
            ReadAheadWorker.this.submit(new Runnable() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.OpenLedgerPhase.5
                @Override // java.lang.Runnable
                public void run() {
                    if (0 != i) {
                        OpenLedgerPhase.this.handleReadLastConfirmedError(i);
                        return;
                    }
                    ReadAheadWorker.this.bkcZkExceptions.set(0);
                    ReadAheadWorker.this.bkcUnExpectedExceptions.set(0);
                    ReadAheadWorker.this.bkcNoLedgerExceptionsOnReadLAC.set(0);
                    if (ReadAheadWorker.LOG.isTraceEnabled()) {
                        try {
                            ReadAheadWorker.LOG.trace("Advancing Last Add Confirmed of {} for {} : {}, {}", new Object[]{ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(j), Long.valueOf(ReadAheadWorker.this.handleCache.getLastAddConfirmed(ReadAheadWorker.this.currentLH))});
                        } catch (BKException e) {
                        }
                    }
                    if (null != ledgerEntry && ReadAheadWorker.this.nextReadAheadPosition.getEntryId() == ledgerEntry.getEntryId() && ReadAheadWorker.this.nextReadAheadPosition.getLedgerId() == ledgerEntry.getLedgerId()) {
                        if (j <= 4 && ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges()) {
                            ReadAheadWorker.LOG.info("Hit readLastConfirmedAndEntry for {} at {} : entry = {}, lac = {}, position = {}.", new Object[]{ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis()), Long.valueOf(ledgerEntry.getEntryId()), Long.valueOf(j), ReadAheadWorker.this.nextReadAheadPosition});
                        }
                        if (!ReadAheadWorker.this.isCatchingUp) {
                            long entryId = j - ReadAheadWorker.this.nextReadAheadPosition.getEntryId();
                            if (entryId > 0) {
                                ReadAheadWorker.this.readAheadLacLagStats.registerSuccessfulEvent(entryId);
                            }
                        }
                        ReadAheadWorker.this.nextReadAheadPosition.advance();
                        ReadAheadWorker.this.readAheadCache.set(new LedgerReadPosition(ledgerEntry.getLedgerId(), ReadAheadWorker.this.currentLH.getLogSegmentSequenceNo(), ledgerEntry.getEntryId()), ledgerEntry, null != obj ? obj.toString() : "", ReadAheadWorker.this.currentMetadata.getEnvelopeEntries(), ReadAheadWorker.this.currentMetadata.getStartSequenceId());
                        if (ReadAheadWorker.LOG.isTraceEnabled()) {
                            ReadAheadWorker.LOG.trace("Reading the value received {} for {} : entryId {}", new Object[]{ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(ledgerEntry.getEntryId())});
                        }
                        ReadAheadWorker.this.readAheadEntryPiggyBackHits.inc();
                    } else {
                        if (j > ReadAheadWorker.this.nextReadAheadPosition.getEntryId()) {
                            ReadAheadWorker.LOG.info("{} : entry {} isn't piggybacked but last add confirmed already moves to {}.", new Object[]{ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), Long.valueOf(ReadAheadWorker.this.nextReadAheadPosition.getEntryId()), Long.valueOf(j)});
                        }
                        ReadAheadWorker.this.readAheadEntryPiggyBackMisses.inc();
                    }
                    OpenLedgerPhase.this.next.process(i);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$Phase.class */
    public abstract class Phase {
        final Phase next;

        Phase(Phase phase) {
            this.next = phase;
        }

        abstract void process(int i);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$ReadEntriesPhase.class */
    public final class ReadEntriesPhase extends Phase implements Runnable {
        boolean cacheFull;
        long lastAddConfirmed;

        ReadEntriesPhase(Phase phase) {
            super(phase);
            this.cacheFull = false;
            this.lastAddConfirmed = -1L;
        }

        @Override // com.twitter.distributedlog.readahead.ReadAheadWorker.Phase
        void process(int i) {
            if (!ReadAheadWorker.this.running) {
                ReadAheadWorker.this.setReadAheadStopped();
                return;
            }
            ReadAheadWorker.this.tracker.enterPhase(ReadAheadPhase.READ_ENTRIES);
            this.cacheFull = false;
            this.lastAddConfirmed = -1L;
            if (null == ReadAheadWorker.this.currentLH) {
                complete();
                return;
            }
            try {
                this.lastAddConfirmed = ReadAheadWorker.this.handleCache.getLastAddConfirmed(ReadAheadWorker.this.currentLH);
                read();
            } catch (BKException e) {
                ReadAheadWorker.this.handleException(ReadAheadPhase.READ_LAST_CONFIRMED, e.getCode());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void read() {
            if (this.lastAddConfirmed < ReadAheadWorker.this.nextReadAheadPosition.getEntryId()) {
                if (ReadAheadWorker.LOG.isTraceEnabled()) {
                    ReadAheadWorker.LOG.trace("Nothing to read for {} of {} : lastAddConfirmed = {}, nextReadAheadPosition = {}", new Object[]{ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(this.lastAddConfirmed), ReadAheadWorker.this.nextReadAheadPosition});
                }
                complete();
                return;
            }
            if (ReadAheadWorker.LOG.isTraceEnabled()) {
                ReadAheadWorker.LOG.trace("Reading entry {} for {} of {}.", new Object[]{ReadAheadWorker.this.nextReadAheadPosition, ReadAheadWorker.this.currentMetadata, ReadAheadWorker.this.fullyQualifiedName});
            }
            int readAheadBatchSize = ReadAheadWorker.this.dynConf.getReadAheadBatchSize();
            final long entryId = ReadAheadWorker.this.nextReadAheadPosition.getEntryId();
            final long min = Math.min(this.lastAddConfirmed, (ReadAheadWorker.this.nextReadAheadPosition.getEntryId() + readAheadBatchSize) - 1);
            if (min <= readAheadBatchSize && ReadAheadWorker.this.conf.getTraceReadAheadMetadataChanges()) {
                ReadAheadWorker.LOG.info("Reading entries ({} - {}) for {} at {} : lac = {}, nextReadAheadPosition = {}.", new Object[]{Long.valueOf(entryId), Long.valueOf(min), ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(System.currentTimeMillis()), Long.valueOf(this.lastAddConfirmed), ReadAheadWorker.this.nextReadAheadPosition});
            }
            final String format = String.format("ReadEntries(%d-%d)", Long.valueOf(entryId), Long.valueOf(min));
            ReadAheadWorker.this.handleCache.asyncReadEntries(ReadAheadWorker.this.currentLH, entryId, min).addEventListener(new FutureEventListener<Enumeration<LedgerEntry>>() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.ReadEntriesPhase.1
                public void onSuccess(Enumeration<LedgerEntry> enumeration) {
                    int i = 0;
                    if (ReadAheadWorker.this.failureInjector.shouldInjectCorruption() && ReadEntriesPhase.this.rangeContainsSimulatedBrokenEntry(entryId, min)) {
                        i = -5;
                    }
                    ReadEntriesPhase.this.readComplete(i, null, enumeration, format, entryId, min);
                }

                public void onFailure(Throwable th) {
                    ReadEntriesPhase.this.readComplete(FutureUtils.bkResultCode(th), null, null, format, entryId, min);
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean rangeContainsSimulatedBrokenEntry(long j, long j2) {
            long j3 = j;
            while (true) {
                long j4 = j3;
                if (j4 > j2) {
                    return false;
                }
                if (j4 % 10 == 0) {
                    return true;
                }
                j3 = j4 + 1;
            }
        }

        public void readComplete(final int i, LedgerHandle ledgerHandle, final Enumeration<LedgerEntry> enumeration, final Object obj, final long j, final long j2) {
            ReadAheadWorker.this.submit(new Runnable() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.ReadEntriesPhase.2
                @Override // java.lang.Runnable
                public void run() {
                    long j3 = (j2 - j) + 1;
                    if (-5 == i && ReadAheadWorker.this.readAheadSkipBrokenEntries) {
                        ReadAheadWorker.this.readAheadReadEntriesStat.registerFailedEvent(0L);
                        ReadAheadWorker.LOG.error("BK DigestMatchException while reading entries {}-{} in stream {}, entry {} discarded", new Object[]{Long.valueOf(j), Long.valueOf(j2), ReadAheadWorker.this.fullyQualifiedName, Long.valueOf(j)});
                        ReadAheadWorker.this.bkcZkExceptions.set(0);
                        ReadAheadWorker.this.bkcUnExpectedExceptions.set(0);
                        ReadAheadWorker.this.readAheadSkippedBrokenEntries.inc();
                        ReadAheadWorker.this.nextReadAheadPosition.advance();
                    } else {
                        if (0 != i) {
                            ReadAheadWorker.this.readAheadReadEntriesStat.registerFailedEvent(0L);
                            ReadAheadWorker.LOG.debug("BK Exception {} while reading entry", Integer.valueOf(i));
                            ReadAheadWorker.this.handleException(ReadAheadPhase.READ_ENTRIES, i);
                            return;
                        }
                        int i2 = 0;
                        while (enumeration.hasMoreElements()) {
                            ReadAheadWorker.this.bkcZkExceptions.set(0);
                            ReadAheadWorker.this.bkcUnExpectedExceptions.set(0);
                            ReadAheadWorker.this.nextReadAheadPosition.advance();
                            LedgerEntry ledgerEntry = (LedgerEntry) enumeration.nextElement();
                            LedgerReadPosition ledgerReadPosition = new LedgerReadPosition(ledgerEntry.getLedgerId(), ReadAheadWorker.this.currentMetadata.getLogSegmentSequenceNumber(), ledgerEntry.getEntryId());
                            ReadAheadWorker.this.readAheadCache.set(ledgerReadPosition, ledgerEntry, null != obj ? obj.toString() : "", ReadAheadWorker.this.currentMetadata.getEnvelopeEntries(), ReadAheadWorker.this.currentMetadata.getStartSequenceId());
                            i2++;
                            if (ReadAheadWorker.LOG.isDebugEnabled()) {
                                ReadAheadWorker.LOG.debug("Read entry {} of {}.", ledgerReadPosition, ReadAheadWorker.this.fullyQualifiedName);
                            }
                        }
                        ReadAheadWorker.this.readAheadReadEntriesStat.registerSuccessfulEvent(i2);
                    }
                    if (!ReadAheadWorker.this.readAheadCache.isCacheFull()) {
                        ReadEntriesPhase.this.read();
                    } else {
                        ReadEntriesPhase.this.cacheFull = true;
                        ReadEntriesPhase.this.complete();
                    }
                }
            });
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void complete() {
            if (!this.cacheFull) {
                run();
                return;
            }
            ReadAheadWorker.LOG.trace("Cache for {} is full. Backoff reading until notified", ReadAheadWorker.this.fullyQualifiedName);
            ReadAheadWorker.this.readAheadCacheFullCounter.inc();
            ReadAheadWorker.this.resumeStopWatch.reset().start();
            ReadAheadWorker.this.stopPromise = null;
            ReadAheadWorker.this.readAheadCache.setReadAheadCallback(ReadAheadWorker.this);
        }

        @Override // java.lang.Runnable
        public void run() {
            this.next.process(0);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$ReadLastConfirmedAndEntryCallbackWithNotification.class */
    public class ReadLastConfirmedAndEntryCallbackWithNotification extends LongPollNotification<AsyncCallback.ReadLastConfirmedAndEntryCallback> implements AsyncCallback.ReadLastConfirmedAndEntryCallback {
        ReadLastConfirmedAndEntryCallbackWithNotification(long j, AsyncCallback.ReadLastConfirmedAndEntryCallback readLastConfirmedAndEntryCallback, Object obj) {
            super(j, readLastConfirmedAndEntryCallback, obj);
        }

        public void readLastConfirmedAndEntryComplete(int i, long j, LedgerEntry ledgerEntry, Object obj) {
            if (this.called.compareAndSet(false, true)) {
                synchronized (ReadAheadWorker.this.notificationLock) {
                    ReadAheadWorker.this.metadataNotification = null;
                }
                ((AsyncCallback.ReadLastConfirmedAndEntryCallback) this.cb).readLastConfirmedAndEntryComplete(i, j, ledgerEntry, obj);
            }
        }

        @Override // com.twitter.distributedlog.readahead.ReadAheadWorker.LongPollNotification
        void doComplete(boolean z) {
            readLastConfirmedAndEntryComplete(0, this.lac, null, this.ctx);
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$ScheduleReadAheadPhase.class */
    final class ScheduleReadAheadPhase extends Phase {
        ScheduleReadAheadPhase() {
            super(null);
        }

        @Override // com.twitter.distributedlog.readahead.ReadAheadWorker.Phase
        void process(int i) {
            if (!ReadAheadWorker.this.running) {
                ReadAheadWorker.this.setReadAheadStopped();
                return;
            }
            ReadAheadWorker.this.tracker.enterPhase(ReadAheadPhase.SCHEDULE_READAHEAD);
            boolean shouldInjectErrors = ReadAheadWorker.this.failureInjector.shouldInjectErrors();
            if (!ReadAheadWorker.this.encounteredException && !shouldInjectErrors) {
                if (ReadAheadWorker.LOG.isTraceEnabled()) {
                    ReadAheadWorker.LOG.trace("Scheduling read ahead for {} now.", ReadAheadWorker.this.fullyQualifiedName);
                }
                ReadAheadWorker.this.submit(ReadAheadWorker.this);
                return;
            }
            if (ReadAheadWorker.this.bkcZkExceptions.get() > DistributedLogConfiguration.BKDL_READER_IDLE_WARN_THRESHOLD_MILLIS_DEFAULT / ReadAheadWorker.this.conf.getReadAheadWaitTime() || shouldInjectErrors) {
                ReadAheadWorker.LOG.error("{} : BookKeeper Client used by the ReadAhead Thread has encountered {} zookeeper exceptions : simulate = {}", new Object[]{ReadAheadWorker.this.fullyQualifiedName, Integer.valueOf(ReadAheadWorker.this.bkcZkExceptions.get()), Boolean.valueOf(shouldInjectErrors)});
                ReadAheadWorker.this.running = false;
                ReadAheadWorker.this.setReadAheadError(ReadAheadWorker.this.tracker);
            } else if (ReadAheadWorker.this.bkcUnExpectedExceptions.get() > 3) {
                ReadAheadWorker.LOG.error("{} : ReadAhead Thread has encountered {} unexpected BK exceptions.", ReadAheadWorker.this.fullyQualifiedName, Integer.valueOf(ReadAheadWorker.this.bkcUnExpectedExceptions.get()));
                ReadAheadWorker.this.running = false;
                ReadAheadWorker.this.setReadAheadError(ReadAheadWorker.this.tracker);
            } else {
                ReadAheadWorker.this.reInitializeMetadata = true;
                ReadAheadWorker.this.encounteredException = false;
                if (ReadAheadWorker.LOG.isTraceEnabled()) {
                    ReadAheadWorker.LOG.trace("Scheduling read ahead for {} after {} ms.", ReadAheadWorker.this.fullyQualifiedName, Integer.valueOf(ReadAheadWorker.this.conf.getReadAheadWaitTime() / 4));
                }
                ReadAheadWorker.this.schedule(ReadAheadWorker.this, ReadAheadWorker.this.conf.getReadAheadWaitTime() / 4);
            }
        }
    }

    /* loaded from: input_file:com/twitter/distributedlog/readahead/ReadAheadWorker$StoppablePhase.class */
    final class StoppablePhase extends Phase {
        StoppablePhase(Phase phase) {
            super(phase);
        }

        @Override // com.twitter.distributedlog.readahead.ReadAheadWorker.Phase
        void process(int i) {
            if (!ReadAheadWorker.this.running) {
                ReadAheadWorker.this.setReadAheadStopped();
                return;
            }
            if (null == ReadAheadWorker.this.stopPromise) {
                ReadAheadWorker.this.stopPromise = new Promise<>();
            }
            this.next.process(0);
        }
    }

    public ReadAheadWorker(DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, ZKLogMetadataForReader zKLogMetadataForReader, BKLogHandler bKLogHandler, ZooKeeperClient zooKeeperClient, OrderedScheduler orderedScheduler, LedgerHandleCache ledgerHandleCache, LedgerReadPosition ledgerReadPosition, ReadAheadCache readAheadCache, boolean z, ReadAheadExceptionsLogger readAheadExceptionsLogger, StatsLogger statsLogger, StatsLogger statsLogger2, AlertStatsLogger alertStatsLogger, AsyncFailureInjector asyncFailureInjector, AsyncNotification asyncNotification) {
        this.fullyQualifiedName = zKLogMetadataForReader.getFullyQualifiedName();
        this.conf = distributedLogConfiguration;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.logMetadata = zKLogMetadataForReader;
        this.bkLedgerManager = bKLogHandler;
        this.isHandleForReading = z;
        this.notification = asyncNotification;
        this.zkc = zooKeeperClient;
        this.scheduler = orderedScheduler;
        this.handleCache = ledgerHandleCache;
        this.readAheadCache = readAheadCache;
        this.startReadPosition = new LedgerReadPosition(ledgerReadPosition);
        this.nextReadAheadPosition = new LedgerReadPosition(ledgerReadPosition);
        this.getLedgersWatcher = this.zkc.getWatcherManager().registerChildWatcher(zKLogMetadataForReader.getLogSegmentsPath(), this);
        this.failureInjector = asyncFailureInjector;
        this.metadataLatencyWarnThresholdMillis = distributedLogConfiguration.getMetadataLatencyWarnThresholdMillis();
        this.noLedgerExceptionOnReadLACThreshold = distributedLogConfiguration.getReadAheadNoSuchLedgerExceptionOnReadLACErrorThresholdMillis() / distributedLogConfiguration.getReadAheadWaitTime();
        this.tracker = new ReadAheadTracker(zKLogMetadataForReader.getLogName(), readAheadCache, ReadAheadPhase.SCHEDULE_READAHEAD, statsLogger2);
        this.readAheadSkipBrokenEntries = distributedLogConfiguration.getReadAheadSkipBrokenEntries();
        this.alertStatsLogger = alertStatsLogger;
        this.readAheadPerStreamStatsLogger = statsLogger2;
        StatsLogger scope = statsLogger.scope("readahead_worker");
        this.readAheadWorkerWaits = scope.getCounter("wait");
        this.readAheadEntryPiggyBackHits = scope.getCounter("entry_piggy_back_hits");
        this.readAheadEntryPiggyBackMisses = scope.getCounter("entry_piggy_back_misses");
        this.readAheadReadEntriesStat = scope.getOpStatsLogger("read_entries");
        this.readAheadReadLACAndEntryCounter = scope.getCounter("read_lac_and_entry_counter");
        this.readAheadCacheFullCounter = scope.getCounter("cache_full");
        this.readAheadSkippedBrokenEntries = scope.getCounter("skipped_broken_entries");
        this.readAheadCacheResumeStat = scope.getOpStatsLogger("resume");
        this.readAheadLacLagStats = scope.getOpStatsLogger("read_lac_lag");
        this.longPollInterruptionStat = scope.getOpStatsLogger("long_poll_interruption");
        this.notificationExecutionStat = scope.getOpStatsLogger("notification_execution");
        this.metadataReinitializationStat = scope.getOpStatsLogger("metadata_reinitialization");
        this.idleReaderWarn = scope.getCounter("idle_reader_warn");
        this.readAheadExceptionsLogger = readAheadExceptionsLogger;
    }

    @VisibleForTesting
    public LedgerReadPosition getNextReadAheadPosition() {
        return this.nextReadAheadPosition;
    }

    public LedgerDescriptor getCurrentLedgerDescriptor() {
        return this.currentLH;
    }

    void setReadAheadError(ReadAheadTracker readAheadTracker) {
        LOG.error("Read Ahead for {} is set to error.", this.logMetadata.getFullyQualifiedName());
        this.readAheadError = true;
        readAheadTracker.enterPhase(ReadAheadPhase.ERROR);
        if (null != this.notification) {
            this.notification.notifyOnError();
        }
        if (null != this.stopPromise) {
            FutureUtils.setValue(this.stopPromise, null);
        }
    }

    void setReadAheadInterrupted(ReadAheadTracker readAheadTracker) {
        this.readAheadInterrupted = true;
        readAheadTracker.enterPhase(ReadAheadPhase.INTERRUPTED);
        if (null != this.notification) {
            this.notification.notifyOnError();
        }
        if (null != this.stopPromise) {
            FutureUtils.setValue(this.stopPromise, null);
        }
    }

    void setReadingFromTruncated(ReadAheadTracker readAheadTracker) {
        this.readingFromTruncated = true;
        readAheadTracker.enterPhase(ReadAheadPhase.TRUNCATED);
        if (null != this.notification) {
            this.notification.notifyOnError();
        }
        if (null != this.stopPromise) {
            FutureUtils.setValue(this.stopPromise, null);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setReadAheadStopped() {
        this.tracker.enterPhase(ReadAheadPhase.STOPPED);
        if (null != this.stopPromise) {
            FutureUtils.setValue(this.stopPromise, null);
        }
        LOG.info("Stopped ReadAheadWorker for {}", this.fullyQualifiedName);
    }

    public void checkClosedOrInError() throws LogNotFoundException, LogReadException, DLInterruptedException, AlreadyTruncatedTransactionException {
        if (this.logDeleted) {
            throw new LogNotFoundException(this.logMetadata.getFullyQualifiedName() + " is already deleted.");
        }
        if (this.readingFromTruncated) {
            throw new AlreadyTruncatedTransactionException(String.format("%s: Trying to position read ahead a segment that is marked truncated", this.logMetadata.getFullyQualifiedName()));
        }
        if (this.readAheadInterrupted) {
            throw new DLInterruptedException(String.format("%s: ReadAhead Thread was interrupted", this.logMetadata.getFullyQualifiedName()));
        }
        if (this.readAheadError) {
            throw new LogReadException(String.format("%s: ReadAhead Thread encountered exceptions", this.logMetadata.getFullyQualifiedName()));
        }
    }

    public boolean isCaughtUp() {
        return !this.isCatchingUp;
    }

    public void start() {
        LOG.debug("Starting ReadAhead Worker for {}", this.fullyQualifiedName);
        this.running = true;
        this.schedulePhase.process(0);
    }

    @Override // com.twitter.distributedlog.io.AsyncCloseable
    public Future<Void> asyncClose() {
        AsyncNotification asyncNotification;
        LOG.info("Stopping Readahead worker for {}", this.fullyQualifiedName);
        this.running = false;
        this.zkc.getWatcherManager().unregisterChildWatcher(this.logMetadata.getLogSegmentsPath(), this);
        synchronized (this.notificationLock) {
            asyncNotification = this.metadataNotification;
            this.metadataNotification = null;
        }
        if (null != asyncNotification) {
            asyncNotification.notifyOnOperationComplete();
        }
        return null == this.stopPromise ? Future.Void() : FutureUtils.ignore(FutureUtils.within(this.stopPromise, 2 * this.conf.getReadAheadWaitTime(), TimeUnit.MILLISECONDS, new TimeoutException("Timeout on waiting for ReadAhead worker to stop " + this.fullyQualifiedName), this.scheduler, this.fullyQualifiedName));
    }

    public String toString() {
        return "Running:" + this.running + ", NextReadAheadPosition:" + this.nextReadAheadPosition + ", BKZKExceptions:" + this.bkcZkExceptions.get() + ", BKUnexpectedExceptions:" + this.bkcUnExpectedExceptions.get() + ", EncounteredException:" + this.encounteredException + ", readAheadError:" + this.readAheadError + ", readAheadInterrupted" + this.readAheadInterrupted + ", CurrentMetadata:" + (null != this.currentMetadata ? this.currentMetadata : "NONE") + ", FailureInjector:" + this.failureInjector;
    }

    @Override // com.twitter.distributedlog.callback.ReadAheadCallback
    public void resumeReadAhead() {
        try {
            this.readAheadCacheResumeStat.registerSuccessfulEvent(this.resumeStopWatch.stop().elapsed(TimeUnit.MICROSECONDS));
        } catch (IllegalStateException e) {
            LOG.error("Encountered illegal state when stopping resume stop watch for {} : ", this.logMetadata.getFullyQualifiedName(), e);
        }
        submit(this);
    }

    Runnable addRTEHandler(final Runnable runnable) {
        return new Runnable() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    runnable.run();
                } catch (RuntimeException e) {
                    ReadAheadWorker.LOG.error("ReadAhead on stream {} encountered runtime exception", ReadAheadWorker.this.logMetadata.getFullyQualifiedName(), e);
                    ReadAheadWorker.this.setReadAheadError(ReadAheadWorker.this.tracker);
                    throw e;
                }
            }
        };
    }

    <T> Function1<T, BoxedUnit> submit(final Function1<T, BoxedUnit> function1) {
        return new AbstractFunction1<T, BoxedUnit>() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.2
            public BoxedUnit apply(final T t) {
                ReadAheadWorker.this.submit(new Runnable() { // from class: com.twitter.distributedlog.readahead.ReadAheadWorker.2.1
                    @Override // java.lang.Runnable
                    public void run() {
                        function1.apply(t);
                    }
                });
                return BoxedUnit.UNIT;
            }

            /* JADX WARN: Multi-variable type inference failed */
            /* renamed from: apply, reason: collision with other method in class */
            public /* bridge */ /* synthetic */ Object m102apply(Object obj) {
                return apply((AnonymousClass2<T>) obj);
            }
        };
    }

    void submit(Runnable runnable) {
        if (this.failureInjector.shouldInjectStops()) {
            LOG.warn("Error injected: read ahead for stream {} is going to stall.", this.logMetadata.getFullyQualifiedName());
        } else {
            if (this.failureInjector.shouldInjectDelays()) {
                schedule(runnable, this.failureInjector.getInjectedDelayMs());
                return;
            }
            try {
                this.scheduler.submit(addRTEHandler(runnable));
            } catch (RejectedExecutionException e) {
                setReadAheadError(this.tracker);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void schedule(Runnable runnable, long j) {
        try {
            InterruptibleScheduledRunnable interruptibleScheduledRunnable = new InterruptibleScheduledRunnable(runnable);
            if (setMetadataNotification(interruptibleScheduledRunnable)) {
                this.scheduler.submit(addRTEHandler(interruptibleScheduledRunnable));
            } else {
                this.scheduler.schedule(addRTEHandler(interruptibleScheduledRunnable), j, TimeUnit.MILLISECONDS);
                this.readAheadWorkerWaits.inc();
            }
        } catch (RejectedExecutionException e) {
            setReadAheadError(this.tracker);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleException(ReadAheadPhase readAheadPhase, int i) {
        this.readAheadExceptionsLogger.getBKExceptionStatsLogger(readAheadPhase.name()).getExceptionCounter(i).inc();
        this.exceptionHandler.process(i);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean closeCurrentLedgerHandle() {
        if (this.currentLH == null) {
            return true;
        }
        boolean z = false;
        LedgerDescriptor ledgerDescriptor = this.currentLH;
        try {
            this.handleCache.closeLedger(ledgerDescriptor);
            this.currentLH = null;
            z = true;
        } catch (BKException e) {
            LOG.debug("BK Exception during closing {} : ", ledgerDescriptor, e);
            handleException(ReadAheadPhase.CLOSE_LEDGER, e.getCode());
        }
        return z;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (this.running) {
            this.readAheadPhase.process(0);
        } else {
            setReadAheadStopped();
        }
    }

    public void process(WatchedEvent watchedEvent) {
        AsyncNotification asyncNotification;
        if (this.zkNotificationDisabled) {
            return;
        }
        if (watchedEvent.getType() == Watcher.Event.EventType.None && watchedEvent.getState() == Watcher.Event.KeeperState.SyncConnected) {
            LOG.debug("Reconnected ...");
            return;
        }
        if ((watchedEvent.getType() != Watcher.Event.EventType.None || watchedEvent.getState() != Watcher.Event.KeeperState.Expired) && watchedEvent.getType() != Watcher.Event.EventType.NodeChildrenChanged) {
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
                this.logDeleted = true;
                setReadAheadError(this.tracker);
                return;
            }
            return;
        }
        synchronized (this.notificationLock) {
            this.reInitializeMetadata = true;
            LOG.debug("{} Read ahead node changed", this.fullyQualifiedName);
            asyncNotification = this.metadataNotification;
            this.metadataNotification = null;
        }
        this.metadataNotificationTimeMillis = System.currentTimeMillis();
        if (null != asyncNotification) {
            asyncNotification.notifyOnOperationComplete();
        }
    }

    @VisibleForTesting
    public void disableZKNotification() {
        LOG.info("{} ZK Notification was disabled", this.fullyQualifiedName);
        this.zkNotificationDisabled = true;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean setMetadataNotification(AsyncNotification asyncNotification) {
        boolean z;
        synchronized (this.notificationLock) {
            this.metadataNotification = asyncNotification;
            z = this.reInitializeMetadata;
        }
        return z;
    }

    @VisibleForTesting
    public AsyncNotification getMetadataNotification() {
        AsyncNotification asyncNotification;
        synchronized (this.notificationLock) {
            asyncNotification = this.metadataNotification;
        }
        return asyncNotification;
    }

    static /* synthetic */ int access$1804(ReadAheadWorker readAheadWorker) {
        int i = readAheadWorker.currentMetadataIndex + 1;
        readAheadWorker.currentMetadataIndex = i;
        return i;
    }
}
