package com.twitter.distributedlog;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.StreamNotReadyException;
import com.twitter.distributedlog.exceptions.WriteCancelledException;
import com.twitter.distributedlog.exceptions.WriteException;
import com.twitter.distributedlog.feature.CoreFeatureKeys;
import com.twitter.distributedlog.stats.OpStatsListener;
import com.twitter.distributedlog.util.FailpointUtils;
import com.twitter.distributedlog.util.FutureUtils;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import com.twitter.util.Try;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function1;
import scala.Option;
import scala.runtime.AbstractFunction1;

/* loaded from: input_file:com/twitter/distributedlog/BKAsyncLogWriter.class */
public class BKAsyncLogWriter extends BKAbstractLogWriter implements AsyncLogWriter {
    static final Logger LOG;
    static Function1<List<LogSegmentMetadata>, Boolean> TruncationResultConverter;
    private final boolean streamFailFast;
    private final boolean disableRollOnSegmentError;
    private LinkedList<PendingLogRecord> pendingRequests;
    private volatile boolean encounteredError;
    private Promise<BKLogSegmentWriter> rollingFuture;
    private long lastTxId;
    private final StatsLogger statsLogger;
    private final OpStatsLogger writeOpStatsLogger;
    private final OpStatsLogger markEndOfStreamOpStatsLogger;
    private final OpStatsLogger bulkWriteOpStatsLogger;
    private final OpStatsLogger getWriterOpStatsLogger;
    private final Counter pendingRequestDispatch;
    private final Feature disableLogSegmentRollingFeature;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/BKAsyncLogWriter$LastPendingLogRecord.class */
    public class LastPendingLogRecord extends PendingLogRecord {
        LastPendingLogRecord(LogRecord logRecord, boolean z) {
            super(logRecord, z);
        }

        @Override // com.twitter.distributedlog.BKAsyncLogWriter.PendingLogRecord
        public void onSuccess(DLSN dlsn) {
            super.onSuccess(dlsn);
            BKAsyncLogWriter.this.rollLogSegmentAndIssuePendingRequests(this.record);
        }

        @Override // com.twitter.distributedlog.BKAsyncLogWriter.PendingLogRecord
        public void onFailure(Throwable th) {
            super.onFailure(th);
            BKAsyncLogWriter.this.errorOutPendingRequestsAndWriter(th);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/twitter/distributedlog/BKAsyncLogWriter$PendingLogRecord.class */
    public class PendingLogRecord implements FutureEventListener<DLSN> {
        final LogRecord record;
        final Promise<DLSN> promise = new Promise<>();
        final boolean flush;

        PendingLogRecord(LogRecord logRecord, boolean z) {
            this.record = logRecord;
            this.flush = z;
        }

        @Override // 
        public void onSuccess(DLSN dlsn) {
            this.promise.setValue(dlsn);
        }

        public void onFailure(Throwable th) {
            this.promise.setException(th);
            BKAsyncLogWriter.this.encounteredError = true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BKAsyncLogWriter(DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, BKDistributedLogManager bKDistributedLogManager, BKLogWriteHandler bKLogWriteHandler, FeatureProvider featureProvider, StatsLogger statsLogger) {
        super(distributedLogConfiguration, dynamicDistributedLogConfiguration, bKDistributedLogManager);
        this.pendingRequests = null;
        this.encounteredError = false;
        this.rollingFuture = null;
        this.lastTxId = -999L;
        this.writeHandler = bKLogWriteHandler;
        this.streamFailFast = distributedLogConfiguration.getFailFastOnStreamNotReady();
        this.disableRollOnSegmentError = distributedLogConfiguration.getDisableRollingOnLogSegmentError();
        this.disableLogSegmentRollingFeature = featureProvider.getFeature(CoreFeatureKeys.DISABLE_LOGSEGMENT_ROLLING.name().toLowerCase());
        this.statsLogger = statsLogger.scope("log_writer");
        this.writeOpStatsLogger = this.statsLogger.getOpStatsLogger("write");
        this.markEndOfStreamOpStatsLogger = this.statsLogger.getOpStatsLogger("mark_end_of_stream");
        this.bulkWriteOpStatsLogger = this.statsLogger.getOpStatsLogger("bulk_write");
        this.getWriterOpStatsLogger = this.statsLogger.getOpStatsLogger("get_writer");
        this.pendingRequestDispatch = this.statsLogger.getCounter("pending_request_dispatch");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    public synchronized void setLastTxId(long j) {
        this.lastTxId = Math.max(this.lastTxId, j);
    }

    @Override // com.twitter.distributedlog.AsyncLogWriter
    public synchronized long getLastTxId() {
        return this.lastTxId;
    }

    public Future<DLSN> writeControlRecord(LogRecord logRecord) {
        logRecord.setControl();
        return write(logRecord);
    }

    private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException {
        if (this.encounteredError) {
            throw new WriteException(this.bkDistributedLogManager.getStreamName(), "writer has been closed due to error.");
        }
        BKLogSegmentWriter cachedLogWriter = getCachedLogWriter();
        if (null == cachedLogWriter || !cachedLogWriter.isLogSegmentInError() || this.disableRollOnSegmentError) {
            return cachedLogWriter;
        }
        return null;
    }

    private Future<BKLogSegmentWriter> getLogSegmentWriter(long j, boolean z, boolean z2, boolean z3) {
        return doGetLogSegmentWriter(j, z, z2, z3).addEventListener(new OpStatsListener(this.getWriterOpStatsLogger, Stopwatch.createStarted()));
    }

    private Future<BKLogSegmentWriter> doGetLogSegmentWriter(final long j, final boolean z, boolean z2, final boolean z3) {
        if (this.encounteredError) {
            return Future.exception(new WriteException(this.bkDistributedLogManager.getStreamName(), "writer has been closed due to error."));
        }
        Future<BKLogSegmentWriter> asyncGetLedgerWriter = asyncGetLedgerWriter(!this.disableRollOnSegmentError);
        return null == asyncGetLedgerWriter ? rollLogSegmentIfNecessary(null, j, z, z3) : z2 ? asyncGetLedgerWriter.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<BKLogSegmentWriter>>() { // from class: com.twitter.distributedlog.BKAsyncLogWriter.2
            public Future<BKLogSegmentWriter> apply(BKLogSegmentWriter bKLogSegmentWriter) {
                return BKAsyncLogWriter.this.rollLogSegmentIfNecessary(bKLogSegmentWriter, j, z, z3);
            }
        }) : asyncGetLedgerWriter;
    }

    private Future<BKLogSegmentWriter> getLogSegmentWriterForEndOfStream() {
        return getLogSegmentWriter(DistributedLogConstants.MAX_TXID, false, false, true);
    }

    private Future<BKLogSegmentWriter> getLogSegmentWriter(long j, boolean z, boolean z2) {
        return getLogSegmentWriter(j, z, z2, false);
    }

    Future<DLSN> queueRequest(LogRecord logRecord, boolean z) {
        PendingLogRecord pendingLogRecord = new PendingLogRecord(logRecord, z);
        this.pendingRequests.add(pendingLogRecord);
        return pendingLogRecord.promise;
    }

    /* JADX WARN: Code restructure failed: missing block: B:10:0x0016, code lost:
    
        if (shouldStartNewSegment(r4) != false) goto L8;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    boolean shouldRollLog(com.twitter.distributedlog.BKLogSegmentWriter r4) {
        /*
            r3 = this;
            r0 = 0
            r1 = r4
            if (r0 == r1) goto L19
            r0 = r3
            org.apache.bookkeeper.feature.Feature r0 = r0.disableLogSegmentRollingFeature     // Catch: java.io.IOException -> L1f
            boolean r0 = r0.isAvailable()     // Catch: java.io.IOException -> L1f
            if (r0 != 0) goto L1d
            r0 = r3
            r1 = r4
            boolean r0 = r0.shouldStartNewSegment(r1)     // Catch: java.io.IOException -> L1f
            if (r0 == 0) goto L1d
        L19:
            r0 = 1
            goto L1e
        L1d:
            r0 = 0
        L1e:
            return r0
        L1f:
            r5 = move-exception
            r0 = 0
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: com.twitter.distributedlog.BKAsyncLogWriter.shouldRollLog(com.twitter.distributedlog.BKLogSegmentWriter):boolean");
    }

    void startQueueingRequests() {
        if (!$assertionsDisabled && (null != this.pendingRequests || null != this.rollingFuture)) {
            throw new AssertionError();
        }
        this.pendingRequests = new LinkedList<>();
        this.rollingFuture = new Promise<>();
    }

    private synchronized Future<DLSN> asyncWrite(final LogRecord logRecord, boolean z) {
        Future<DLSN> asyncWrite;
        try {
            BKLogSegmentWriter cachedLogSegmentWriter = getCachedLogSegmentWriter();
            if (null != this.rollingFuture) {
                asyncWrite = this.streamFailFast ? Future.exception(new StreamNotReadyException("Rolling log segment")) : queueRequest(logRecord, z);
            } else if (shouldRollLog(cachedLogSegmentWriter)) {
                startQueueingRequests();
                if (null != cachedLogSegmentWriter) {
                    LastPendingLogRecord lastPendingLogRecord = new LastPendingLogRecord(logRecord, z);
                    cachedLogSegmentWriter.asyncWrite(logRecord, true).addEventListener(lastPendingLogRecord);
                    asyncWrite = lastPendingLogRecord.promise;
                } else {
                    asyncWrite = queueRequest(logRecord, z);
                    rollLogSegmentAndIssuePendingRequests(logRecord);
                }
            } else {
                asyncWrite = cachedLogSegmentWriter.asyncWrite(logRecord, z);
            }
            return asyncWrite.map(new AbstractFunction1<DLSN, DLSN>() { // from class: com.twitter.distributedlog.BKAsyncLogWriter.3
                public DLSN apply(DLSN dlsn) {
                    BKAsyncLogWriter.this.setLastTxId(logRecord.getTransactionId());
                    return dlsn;
                }
            });
        } catch (WriteException e) {
            return Future.exception(e);
        }
    }

    private List<Future<DLSN>> asyncWriteBulk(List<LogRecord> list) {
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<LogRecord> it = list.iterator();
        while (it.hasNext()) {
            Future<DLSN> asyncWrite = asyncWrite(it.next(), !it.hasNext());
            arrayList.add(asyncWrite);
            Option poll = asyncWrite.poll();
            if (poll.isDefined() && ((Try) poll.get()).isThrow()) {
                break;
            }
        }
        if (list.size() > arrayList.size()) {
            appendCancelledFutures(arrayList, list.size() - arrayList.size());
        }
        return arrayList;
    }

    private void appendCancelledFutures(List<Future<DLSN>> list, int i) {
        WriteCancelledException writeCancelledException = new WriteCancelledException(getStreamName());
        for (int i2 = 0; i2 < i; i2++) {
            list.add(Future.exception(writeCancelledException));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void rollLogSegmentAndIssuePendingRequests(LogRecord logRecord) {
        getLogSegmentWriter(logRecord.getTransactionId(), true, true).addEventListener(new FutureEventListener<BKLogSegmentWriter>() { // from class: com.twitter.distributedlog.BKAsyncLogWriter.4
            public void onSuccess(BKLogSegmentWriter bKLogSegmentWriter) {
                try {
                    synchronized (BKAsyncLogWriter.this) {
                        Iterator it = BKAsyncLogWriter.this.pendingRequests.iterator();
                        while (it.hasNext()) {
                            PendingLogRecord pendingLogRecord = (PendingLogRecord) it.next();
                            FailpointUtils.checkFailPoint(FailpointUtils.FailPointName.FP_LogWriterIssuePending);
                            bKLogSegmentWriter.asyncWrite(pendingLogRecord.record, pendingLogRecord.flush).addEventListener(pendingLogRecord);
                        }
                        if (null != BKAsyncLogWriter.this.rollingFuture) {
                            FutureUtils.setValue(BKAsyncLogWriter.this.rollingFuture, bKLogSegmentWriter);
                        }
                        BKAsyncLogWriter.this.rollingFuture = null;
                        BKAsyncLogWriter.this.pendingRequestDispatch.add(BKAsyncLogWriter.this.pendingRequests.size());
                        BKAsyncLogWriter.this.pendingRequests = null;
                    }
                } catch (IOException e) {
                    BKAsyncLogWriter.this.errorOutPendingRequestsAndWriter(e);
                }
            }

            public void onFailure(Throwable th) {
                BKAsyncLogWriter.this.errorOutPendingRequestsAndWriter(th);
            }
        });
    }

    @VisibleForTesting
    void errorOutPendingRequests(Throwable th, boolean z) {
        LinkedList<PendingLogRecord> linkedList;
        synchronized (this) {
            linkedList = this.pendingRequests;
            this.encounteredError = z;
            this.pendingRequests = null;
            if (null != this.rollingFuture) {
                FutureUtils.setException(this.rollingFuture, th);
            }
            this.rollingFuture = null;
        }
        this.pendingRequestDispatch.add(linkedList.size());
        Iterator<PendingLogRecord> it = linkedList.iterator();
        while (it.hasNext()) {
            it.next().promise.setException(th);
        }
    }

    void errorOutPendingRequestsAndWriter(Throwable th) {
        errorOutPendingRequests(th, true);
    }

    @Override // com.twitter.distributedlog.AsyncLogWriter
    public Future<DLSN> write(LogRecord logRecord) {
        return asyncWrite(logRecord, true).addEventListener(new OpStatsListener(this.writeOpStatsLogger, Stopwatch.createStarted()));
    }

    @Override // com.twitter.distributedlog.AsyncLogWriter
    public Future<List<Future<DLSN>>> writeBulk(List<LogRecord> list) {
        return Future.value(asyncWriteBulk(list)).addEventListener(new OpStatsListener(this.bulkWriteOpStatsLogger, Stopwatch.createStarted()));
    }

    @Override // com.twitter.distributedlog.AsyncLogWriter
    public Future<Boolean> truncate(DLSN dlsn) {
        if (DLSN.InvalidDLSN == dlsn) {
            return Future.value(false);
        }
        try {
            return getWriteHandler().setLogSegmentsOlderThanDLSNTruncated(dlsn).map(TruncationResultConverter);
        } catch (IOException e) {
            return Future.exception(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Long> flushAndCommit() {
        Promise<BKLogSegmentWriter> cachedLogWriterFuture;
        synchronized (this) {
            cachedLogWriterFuture = null != this.rollingFuture ? this.rollingFuture : getCachedLogWriterFuture();
        }
        return null == cachedLogWriterFuture ? Future.value(Long.valueOf(this.lastTxId)) : cachedLogWriterFuture.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { // from class: com.twitter.distributedlog.BKAsyncLogWriter.5
            public Future<Long> apply(BKLogSegmentWriter bKLogSegmentWriter) {
                return bKLogSegmentWriter.flushAndCommit();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<Long> markEndOfStream() {
        Future<BKLogSegmentWriter> future;
        Stopwatch createStarted = Stopwatch.createStarted();
        synchronized (this) {
            future = this.rollingFuture;
        }
        if (null == future) {
            future = getLogSegmentWriterForEndOfStream();
        }
        return future.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Long>>() { // from class: com.twitter.distributedlog.BKAsyncLogWriter.6
            public Future<Long> apply(BKLogSegmentWriter bKLogSegmentWriter) {
                return bKLogSegmentWriter.markEndOfStream();
            }
        }).addEventListener(new OpStatsListener(this.markEndOfStreamOpStatsLogger, createStarted));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.twitter.distributedlog.BKAbstractLogWriter
    public Future<Void> asyncCloseAndComplete() {
        Promise<BKLogSegmentWriter> promise;
        synchronized (this) {
            promise = this.rollingFuture;
        }
        return null == promise ? super.asyncCloseAndComplete() : promise.flatMap(new AbstractFunction1<BKLogSegmentWriter, Future<Void>>() { // from class: com.twitter.distributedlog.BKAsyncLogWriter.7
            public Future<Void> apply(BKLogSegmentWriter bKLogSegmentWriter) {
                return BKAsyncLogWriter.super.asyncCloseAndComplete();
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // com.twitter.distributedlog.BKAbstractLogWriter
    public void closeAndComplete() throws IOException {
        FutureUtils.result(asyncCloseAndComplete());
    }

    @Override // com.twitter.distributedlog.AsyncLogWriter
    public String getStreamName() {
        return this.bkDistributedLogManager.getStreamName();
    }

    public String toString() {
        return String.format("AsyncLogWriter:%s", getStreamName());
    }

    @Override // com.twitter.distributedlog.BKAbstractLogWriter
    @VisibleForTesting
    public /* bridge */ /* synthetic */ void setForceRecovery(boolean z) {
        super.setForceRecovery(z);
    }

    @Override // com.twitter.distributedlog.BKAbstractLogWriter
    @VisibleForTesting
    public /* bridge */ /* synthetic */ void overRideMinTimeStampToKeep(Long l) {
        super.overRideMinTimeStampToKeep(l);
    }

    @Override // com.twitter.distributedlog.BKAbstractLogWriter
    @VisibleForTesting
    public /* bridge */ /* synthetic */ void setForceRolling(boolean z) {
        super.setForceRolling(z);
    }

    @Override // com.twitter.distributedlog.BKAbstractLogWriter, com.twitter.distributedlog.io.AsyncAbortable
    public /* bridge */ /* synthetic */ Future asyncAbort() {
        return super.asyncAbort();
    }

    @Override // com.twitter.distributedlog.BKAbstractLogWriter, com.twitter.distributedlog.io.Abortable
    public /* bridge */ /* synthetic */ void abort() throws IOException {
        super.abort();
    }

    @Override // com.twitter.distributedlog.BKAbstractLogWriter, com.twitter.distributedlog.io.AsyncCloseable
    public /* bridge */ /* synthetic */ Future asyncClose() {
        return super.asyncClose();
    }

    @Override // com.twitter.distributedlog.BKAbstractLogWriter, java.io.Closeable, java.lang.AutoCloseable
    public /* bridge */ /* synthetic */ void close() throws IOException {
        super.close();
    }

    static {
        $assertionsDisabled = !BKAsyncLogWriter.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(BKAsyncLogWriter.class);
        TruncationResultConverter = new AbstractFunction1<List<LogSegmentMetadata>, Boolean>() { // from class: com.twitter.distributedlog.BKAsyncLogWriter.1
            public Boolean apply(List<LogSegmentMetadata> list) {
                return true;
            }
        };
    }
}
