package org.apache.distributedlog.service.stream;

import com.google.common.base.Stopwatch;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import com.twitter.util.Return;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.AsyncLogWriter;
import org.apache.distributedlog.exceptions.ChecksumFailedException;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.service.ResponseUtils;
import org.apache.distributedlog.thrift.service.ResponseHeader;
import org.apache.distributedlog.util.Sequencer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/distributedlog/service/stream/AbstractStreamOp.class */
public abstract class AbstractStreamOp<Response> implements StreamOp {
    private static final Logger logger = LoggerFactory.getLogger(AbstractStreamOp.class);
    protected final String stream;
    protected final OpStatsLogger opStatsLogger;
    private final Promise<Response> result = new Promise<>();
    protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
    protected final Long checksum;
    protected final Feature checksumDisabledFeature;

    public AbstractStreamOp(String str, OpStatsLogger opStatsLogger, Long l, Feature feature) {
        this.stream = str;
        this.opStatsLogger = opStatsLogger;
        this.stopwatch.reset().start();
        this.checksum = l;
        this.checksumDisabledFeature = feature;
    }

    @Override // org.apache.distributedlog.service.stream.StreamOp
    public String streamName() {
        return this.stream;
    }

    @Override // org.apache.distributedlog.service.stream.StreamOp
    public Stopwatch stopwatch() {
        return this.stopwatch;
    }

    @Override // org.apache.distributedlog.service.stream.StreamOp
    public void preExecute() throws DLException {
        Long computeChecksum;
        if (!this.checksumDisabledFeature.isAvailable() && null != this.checksum && null != (computeChecksum = computeChecksum()) && !this.checksum.equals(computeChecksum)) {
            throw new ChecksumFailedException();
        }
    }

    @Override // org.apache.distributedlog.service.stream.StreamOp
    public Long computeChecksum() {
        return null;
    }

    @Override // org.apache.distributedlog.service.stream.StreamOp
    public Future<Void> execute(AsyncLogWriter asyncLogWriter, Sequencer sequencer, Object obj) {
        this.stopwatch.reset().start();
        return executeOp(asyncLogWriter, sequencer, obj).addEventListener(new FutureEventListener<Response>() { // from class: org.apache.distributedlog.service.stream.AbstractStreamOp.1
            public void onSuccess(Response response) {
                AbstractStreamOp.this.opStatsLogger.registerSuccessfulEvent(AbstractStreamOp.this.stopwatch.elapsed(TimeUnit.MICROSECONDS));
                AbstractStreamOp.this.setResponse(response);
            }

            public void onFailure(Throwable th) {
            }
        }).voided();
    }

    @Override // org.apache.distributedlog.service.stream.StreamOp
    public void fail(Throwable th) {
        if (th instanceof OwnershipAcquireFailedException) {
            fail(ResponseUtils.ownerToHeader(((OwnershipAcquireFailedException) th).getCurrentOwner()));
        } else {
            this.opStatsLogger.registerFailedEvent(this.stopwatch.elapsed(TimeUnit.MICROSECONDS));
            fail(ResponseUtils.exceptionToHeader(th));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setResponse(Response response) {
        Return r0 = new Return(response);
        if (this.result.updateIfEmpty(r0)) {
            return;
        }
        logger.error("Result set multiple times. Value='{}', New='{}'", this.result.poll(), r0);
    }

    public Future<Response> result() {
        return this.result;
    }

    protected abstract Future<Response> executeOp(AsyncLogWriter asyncLogWriter, Sequencer sequencer, Object obj);

    protected abstract void fail(ResponseHeader responseHeader);

    public static OpStatsLogger requestStat(StatsLogger statsLogger, String str) {
        return requestLogger(statsLogger).getOpStatsLogger(str);
    }

    public static StatsLogger requestLogger(StatsLogger statsLogger) {
        return statsLogger.scope("request");
    }

    public static StatsLogger requestScope(StatsLogger statsLogger, String str) {
        return requestLogger(statsLogger).scope(str);
    }
}
