package org.apache.distributedlog.service.stream;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.twitter.util.Duration;
import com.twitter.util.Function0;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import com.twitter.util.TimeoutException;
import com.twitter.util.Timer;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.feature.FeatureProvider;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.Gauge;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.distributedlog.AsyncLogWriter;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogManager;
import org.apache.distributedlog.config.DynamicDistributedLogConfiguration;
import org.apache.distributedlog.exceptions.AlreadyClosedException;
import org.apache.distributedlog.exceptions.DLException;
import org.apache.distributedlog.exceptions.OverCapacityException;
import org.apache.distributedlog.exceptions.OwnershipAcquireFailedException;
import org.apache.distributedlog.exceptions.StreamNotReadyException;
import org.apache.distributedlog.exceptions.StreamUnavailableException;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.io.Abortables;
import org.apache.distributedlog.io.AsyncAbortable;
import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.service.FatalErrorHandler;
import org.apache.distributedlog.service.ServerFeatureKeys;
import org.apache.distributedlog.service.config.ServerConfiguration;
import org.apache.distributedlog.service.config.StreamConfigProvider;
import org.apache.distributedlog.service.stream.limiter.StreamRequestLimiter;
import org.apache.distributedlog.service.streamset.Partition;
import org.apache.distributedlog.stats.BroadCastStatsLogger;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.OrderedScheduler;
import org.apache.distributedlog.util.TimeSequencer;
import org.apache.distributedlog.util.Utils;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

/* loaded from: input_file:org/apache/distributedlog/service/stream/StreamImpl.class */
public class StreamImpl implements Stream {
    private static final Logger logger = LoggerFactory.getLogger(StreamImpl.class);
    private final String name;
    private final Partition partition;
    private DistributedLogManager manager;
    private volatile AsyncLogWriter writer;
    private volatile String owner;
    private volatile Throwable lastException;
    private final StreamRequestLimiter limiter;
    private final DynamicDistributedLogConfiguration dynConf;
    private final DistributedLogConfiguration dlConfig;
    private final DistributedLogNamespace dlNamespace;
    private final String clientId;
    private final OrderedScheduler scheduler;
    private final Feature featureRateLimitDisabled;
    private final StreamManager streamManager;
    private final StreamConfigProvider streamConfigProvider;
    private final FatalErrorHandler fatalErrorHandler;
    private final long streamProbationTimeoutMs;
    private final long serviceTimeoutMs;
    private final long writerCloseTimeoutMs;
    private final boolean failFastOnStreamNotReady;
    private final HashedWheelTimer requestTimer;
    private final Timer futureTimer;
    private final StatsLogger streamLogger;
    private final StatsLogger streamExceptionStatLogger;
    private final StatsLogger limiterStatLogger;
    private final Counter serviceTimeout;
    private final OpStatsLogger streamAcquireStat;
    private final OpStatsLogger writerCloseStatLogger;
    private final Counter pendingOpsCounter;
    private final Counter unexpectedExceptions;
    private final Counter writerCloseTimeoutCounter;
    private final StatsLogger exceptionStatLogger;
    private final Gauge<Number> streamStatusGauge;
    private volatile Queue<StreamOp> pendingOps = new ArrayDeque();
    private final Promise<Void> closePromise = new Promise<>();
    private final Object txnLock = new Object();
    private final TimeSequencer sequencer = new TimeSequencer();
    private final ReentrantReadWriteLock closeLock = new ReentrantReadWriteLock();
    private final ConcurrentHashMap<String, Counter> exceptionCounters = new ConcurrentHashMap<>();
    private volatile StreamStatus status = StreamStatus.UNINITIALIZED;

    /* renamed from: org.apache.distributedlog.service.stream.StreamImpl$9, reason: invalid class name */
    /* loaded from: input_file:org/apache/distributedlog/service/stream/StreamImpl$9.class */
    static /* synthetic */ class AnonymousClass9 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode = new int[StatusCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.FOUND.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.ALREADY_CLOSED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.NOT_IMPLEMENTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.METADATA_EXCEPTION.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.LOG_EMPTY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.LOG_NOT_FOUND.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.TRUNCATED_TRANSACTION.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.END_OF_STREAM.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.TRANSACTION_OUT_OF_ORDER.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.INVALID_STREAM_NAME.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.TOO_LARGE_RECORD.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.STREAM_NOT_READY.ordinal()] = 12;
            } catch (NoSuchFieldError e12) {
            }
            try {
                $SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[StatusCode.OVER_CAPACITY.ordinal()] = 13;
            } catch (NoSuchFieldError e13) {
            }
        }
    }

    /* loaded from: input_file:org/apache/distributedlog/service/stream/StreamImpl$StreamStatus.class */
    public enum StreamStatus {
        UNINITIALIZED(-1),
        INITIALIZING(0),
        INITIALIZED(1),
        CLOSING(-4),
        CLOSED(-5),
        ERROR(-6);

        final int code;

        StreamStatus(int i) {
            this.code = i;
        }

        int getCode() {
            return this.code;
        }

        public static boolean isUnavailable(StreamStatus streamStatus) {
            return ERROR == streamStatus || CLOSING == streamStatus || CLOSED == streamStatus;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public StreamImpl(String str, Partition partition, String str2, StreamManager streamManager, StreamOpStats streamOpStats, ServerConfiguration serverConfiguration, DistributedLogConfiguration distributedLogConfiguration, DynamicDistributedLogConfiguration dynamicDistributedLogConfiguration, FeatureProvider featureProvider, StreamConfigProvider streamConfigProvider, DistributedLogNamespace distributedLogNamespace, OrderedScheduler orderedScheduler, FatalErrorHandler fatalErrorHandler, HashedWheelTimer hashedWheelTimer, Timer timer) {
        this.clientId = str2;
        this.dlConfig = distributedLogConfiguration;
        this.streamManager = streamManager;
        this.name = str;
        this.partition = partition;
        this.lastException = new IOException("Fail to write record to stream " + str);
        this.streamConfigProvider = streamConfigProvider;
        this.dlNamespace = distributedLogNamespace;
        this.featureRateLimitDisabled = featureProvider.getFeature(ServerFeatureKeys.SERVICE_RATE_LIMIT_DISABLED.name().toLowerCase());
        this.scheduler = orderedScheduler;
        this.serviceTimeoutMs = serverConfiguration.getServiceTimeoutMs();
        this.streamProbationTimeoutMs = serverConfiguration.getStreamProbationTimeoutMs();
        this.writerCloseTimeoutMs = serverConfiguration.getWriterCloseTimeoutMs();
        this.failFastOnStreamNotReady = distributedLogConfiguration.getFailFastOnStreamNotReady();
        this.fatalErrorHandler = fatalErrorHandler;
        this.dynConf = dynamicDistributedLogConfiguration;
        this.limiter = new StreamRequestLimiter(str, this.dynConf, BroadCastStatsLogger.two(streamOpStats.baseScope("stream_limiter"), streamOpStats.streamRequestScope(partition, "limiter")), this.featureRateLimitDisabled);
        this.requestTimer = hashedWheelTimer;
        this.futureTimer = timer;
        this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
        this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
        this.streamExceptionStatLogger = this.streamLogger.scope("exceptions");
        this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
        StatsLogger baseScope = streamOpStats.baseScope("streams");
        this.streamAcquireStat = baseScope.getOpStatsLogger("acquire");
        this.pendingOpsCounter = streamOpStats.baseCounter("pending_ops");
        this.unexpectedExceptions = streamOpStats.baseCounter("unexpected_exceptions");
        this.exceptionStatLogger = streamOpStats.requestScope("exceptions");
        this.writerCloseStatLogger = baseScope.getOpStatsLogger("writer_close");
        this.writerCloseTimeoutCounter = baseScope.getCounter("writer_close_timeouts");
        this.streamStatusGauge = new Gauge<Number>() { // from class: org.apache.distributedlog.service.stream.StreamImpl.1
            public Number getDefaultValue() {
                return Integer.valueOf(StreamStatus.UNINITIALIZED.getCode());
            }

            public Number getSample() {
                return Integer.valueOf(StreamImpl.this.status.getCode());
            }
        };
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public String getOwner() {
        return this.owner;
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public String getStreamName() {
        return this.name;
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public DynamicDistributedLogConfiguration getStreamConfiguration() {
        return this.dynConf;
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public Partition getPartition() {
        return this.partition;
    }

    private DistributedLogManager openLog(String str) throws IOException {
        return this.dlNamespace.openLog(str, Optional.absent(), Optional.of(this.dynConf), Optional.of(this.streamLogger));
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public void initialize() throws IOException {
        this.manager = openLog(this.name);
        this.streamLogger.registerGauge("stream_status", this.streamStatusGauge);
        this.status = StreamStatus.INITIALIZING;
    }

    public String toString() {
        return String.format("Stream:%s, %s, %s Status:%s", this.name, this.manager, this.writer, this.status);
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public void start() {
        acquireStream().addEventListener(new FutureEventListener<Boolean>() { // from class: org.apache.distributedlog.service.stream.StreamImpl.2
            public void onSuccess(Boolean bool) {
                if (bool.booleanValue()) {
                    return;
                }
                StreamImpl.this.setStreamInErrorStatus();
                StreamImpl.this.requestClose("Failed to acquire the ownership");
            }

            public void onFailure(Throwable th) {
                StreamImpl.logger.error("Stream {} threw unhandled exception : ", StreamImpl.this.name, th);
                StreamImpl.this.setStreamInErrorStatus();
                StreamImpl.this.requestClose("Unhandled exception");
            }
        });
    }

    void countException(Throwable th, StatsLogger statsLogger) {
        String name = null == th ? "null" : th.getClass().getName();
        Counter counter = this.exceptionCounters.get(name);
        if (null == counter) {
            counter = this.exceptionStatLogger.getCounter(name);
            Counter putIfAbsent = this.exceptionCounters.putIfAbsent(name, counter);
            if (null != putIfAbsent) {
                counter = putIfAbsent;
            }
        }
        counter.inc();
        statsLogger.getCounter(name).inc();
    }

    boolean isCriticalException(Throwable th) {
        return !(th instanceof OwnershipAcquireFailedException);
    }

    void scheduleTimeout(final StreamOp streamOp) {
        final Timeout newTimeout = this.requestTimer.newTimeout(new TimerTask() { // from class: org.apache.distributedlog.service.stream.StreamImpl.3
            public void run(Timeout timeout) throws Exception {
                if (timeout.isCancelled()) {
                    return;
                }
                StreamImpl.this.serviceTimeout.inc();
                StreamImpl.this.handleServiceTimeout("Operation " + streamOp.getClass().getName() + " timeout");
            }
        }, this.serviceTimeoutMs, TimeUnit.MILLISECONDS);
        streamOp.responseHeader().ensure(new Function0<BoxedUnit>() { // from class: org.apache.distributedlog.service.stream.StreamImpl.4
            /* renamed from: apply, reason: merged with bridge method [inline-methods] */
            public BoxedUnit m36apply() {
                newTimeout.cancel();
                return null;
            }
        });
    }

    void handleServiceTimeout(String str) {
        synchronized (this) {
            if (StreamStatus.isUnavailable(this.status)) {
                return;
            }
            setStreamInErrorStatus();
            requestClose(str, false).onSuccess(new AbstractFunction1<Void, BoxedUnit>() { // from class: org.apache.distributedlog.service.stream.StreamImpl.5
                public BoxedUnit apply(Void r6) {
                    StreamImpl.this.streamManager.scheduleRemoval(StreamImpl.this, StreamImpl.this.streamProbationTimeoutMs);
                    return BoxedUnit.UNIT;
                }
            });
        }
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public void submit(StreamOp streamOp) {
        try {
            this.limiter.apply(streamOp);
            if (this.serviceTimeoutMs > 0) {
                scheduleTimeout(streamOp);
            }
            boolean z = false;
            boolean z2 = true;
            if (StreamStatus.isUnavailable(this.status)) {
                streamOp.fail(new StreamUnavailableException("Stream " + this.name + " is closed."));
                return;
            }
            if (StreamStatus.INITIALIZED != this.status || this.writer == null) {
                synchronized (this) {
                    if (StreamStatus.isUnavailable(this.status)) {
                        streamOp.fail(new StreamUnavailableException("Stream " + this.name + " is closed."));
                        return;
                    }
                    if (StreamStatus.INITIALIZED == this.status) {
                        z = true;
                        z2 = true;
                    } else {
                        if (this.failFastOnStreamNotReady) {
                            streamOp.fail(new StreamNotReadyException("Stream " + this.name + " is not ready; status = " + this.status));
                            return;
                        }
                        this.pendingOps.add(streamOp);
                        this.pendingOpsCounter.inc();
                        if (1 == this.pendingOps.size() && (streamOp instanceof HeartbeatOp)) {
                            ((HeartbeatOp) streamOp).setWriteControlRecord(true);
                        }
                    }
                }
            } else {
                z = true;
                z2 = true;
            }
            if (z) {
                executeOp(streamOp, z2);
            }
        } catch (OverCapacityException e) {
            streamOp.fail(e);
        }
    }

    void executeOp(final StreamOp streamOp, boolean z) {
        AsyncLogWriter asyncLogWriter;
        Throwable th;
        synchronized (this) {
            asyncLogWriter = this.writer;
            th = this.lastException;
        }
        if (null != asyncLogWriter && z) {
            streamOp.execute(asyncLogWriter, this.sequencer, this.txnLock).addEventListener(new FutureEventListener<Void>() { // from class: org.apache.distributedlog.service.stream.StreamImpl.6
                static final /* synthetic */ boolean $assertionsDisabled;

                public void onSuccess(Void r2) {
                }

                public void onFailure(Throwable th2) {
                    boolean z2 = true;
                    if (th2 instanceof DLException) {
                        switch (AnonymousClass9.$SwitchMap$org$apache$distributedlog$thrift$service$StatusCode[((DLException) th2).getCode().ordinal()]) {
                            case 1:
                                if (!$assertionsDisabled && !(th2 instanceof OwnershipAcquireFailedException)) {
                                    throw new AssertionError();
                                }
                                z2 = false;
                                StreamImpl.this.handleExceptionOnStreamOp(streamOp, th2);
                                break;
                            case 2:
                                if (!$assertionsDisabled && !(th2 instanceof AlreadyClosedException)) {
                                    throw new AssertionError();
                                }
                                streamOp.fail(th2);
                                StreamImpl.this.handleAlreadyClosedException((AlreadyClosedException) th2);
                                break;
                                break;
                            case 3:
                            case 4:
                            case 5:
                            case 6:
                            case 7:
                            case 8:
                            case 9:
                            case 10:
                            case 11:
                            case 12:
                            case 13:
                                streamOp.fail(th2);
                                break;
                            default:
                                StreamImpl.this.handleExceptionOnStreamOp(streamOp, th2);
                                break;
                        }
                    } else {
                        StreamImpl.this.handleExceptionOnStreamOp(streamOp, th2);
                    }
                    if (z2) {
                        StreamImpl.this.countException(th2, StreamImpl.this.streamExceptionStatLogger);
                    }
                }

                static {
                    $assertionsDisabled = !StreamImpl.class.desiredAssertionStatus();
                }
            });
        } else if (null != th) {
            streamOp.fail(th);
        } else {
            streamOp.fail(new StreamUnavailableException("Stream " + this.name + " is closed."));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleExceptionOnStreamOp(StreamOp streamOp, Throwable th) {
        AsyncLogWriter asyncLogWriter = null;
        boolean z = false;
        synchronized (this) {
            if (StreamStatus.INITIALIZED == this.status) {
                asyncLogWriter = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, th);
                z = true;
            }
        }
        if (z) {
            Abortables.asyncAbort(asyncLogWriter, false);
            if (isCriticalException(th)) {
                logger.error("Failed to write data into stream {} : ", this.name, th);
            } else {
                logger.warn("Failed to write data into stream {} : {}", this.name, th.getMessage());
            }
            requestClose("Failed to write data into stream " + this.name + " : " + th.getMessage());
        }
        streamOp.fail(th);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleAlreadyClosedException(AlreadyClosedException alreadyClosedException) {
        this.unexpectedExceptions.inc();
        logger.error("Encountered unexpected exception when writing data into stream {} : ", this.name, alreadyClosedException);
        this.fatalErrorHandler.notifyFatalError();
    }

    Future<Boolean> acquireStream() {
        final Stopwatch createStarted = Stopwatch.createStarted();
        final Promise promise = new Promise();
        this.manager.openAsyncLogWriter().addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<AsyncLogWriter>() { // from class: org.apache.distributedlog.service.stream.StreamImpl.7
            public void onSuccess(AsyncLogWriter asyncLogWriter) {
                StreamImpl.this.onAcquireStreamSuccess(asyncLogWriter, createStarted, promise);
            }

            public void onFailure(Throwable th) {
                StreamImpl.this.onAcquireStreamFailure(th, createStarted, promise);
            }
        }, this.scheduler, getStreamName()));
        return promise;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onAcquireStreamSuccess(AsyncLogWriter asyncLogWriter, Stopwatch stopwatch, Promise<Boolean> promise) {
        AsyncAbortable streamStatus;
        Queue<StreamOp> queue;
        boolean z;
        synchronized (this.txnLock) {
            this.sequencer.setLastId(asyncLogWriter.getLastTxId());
        }
        synchronized (this) {
            streamStatus = setStreamStatus(StreamStatus.INITIALIZED, StreamStatus.INITIALIZING, asyncLogWriter, null);
            queue = this.pendingOps;
            this.pendingOps = new ArrayDeque();
            z = true;
        }
        if (!this.streamManager.allowAcquire(this)) {
            if (null != streamStatus) {
                Abortables.asyncAbort(streamStatus, true);
            }
            StreamUnavailableException streamUnavailableException = new StreamUnavailableException("Stream " + this.partition.getStream() + " is not allowed to acquire more than " + this.dynConf.getMaxAcquiredPartitionsPerProxy() + " partitions");
            countException(streamUnavailableException, this.exceptionStatLogger);
            logger.error("Failed to acquire stream {} because it is unavailable : {}", this.name, streamUnavailableException.getMessage());
            synchronized (this) {
                streamStatus = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZED, null, streamUnavailableException);
                z = false;
            }
        }
        processPendingRequestsAfterAcquire(z, streamStatus, queue, stopwatch, promise);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onAcquireStreamFailure(Throwable th, Stopwatch stopwatch, Promise<Boolean> promise) {
        AsyncLogWriter streamStatus;
        Queue<StreamOp> queue;
        if (th instanceof AlreadyClosedException) {
            countException(th, this.streamExceptionStatLogger);
            handleAlreadyClosedException((AlreadyClosedException) th);
            return;
        }
        if (isCriticalException(th)) {
            countException(th, this.streamExceptionStatLogger);
            logger.error("Failed to acquire stream {} : ", this.name, th);
        } else {
            logger.warn("Failed to acquire stream {} : {}", this.name, th.getMessage());
        }
        synchronized (this) {
            streamStatus = setStreamStatus(StreamStatus.ERROR, StreamStatus.INITIALIZING, null, th);
            queue = this.pendingOps;
            this.pendingOps = new ArrayDeque();
        }
        processPendingRequestsAfterAcquire(false, streamStatus, queue, stopwatch, promise);
    }

    void processPendingRequestsAfterAcquire(boolean z, AsyncLogWriter asyncLogWriter, Queue<StreamOp> queue, Stopwatch stopwatch, Promise<Boolean> promise) {
        if (z) {
            this.streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
        } else {
            this.streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
        }
        Iterator<StreamOp> it = queue.iterator();
        while (it.hasNext()) {
            executeOp(it.next(), z);
            this.pendingOpsCounter.dec();
        }
        Abortables.asyncAbort(asyncLogWriter, true);
        FutureUtils.setValue(promise, Boolean.valueOf(z));
    }

    synchronized void setStreamInErrorStatus() {
        if (StreamStatus.CLOSING == this.status || StreamStatus.CLOSED == this.status) {
            return;
        }
        this.status = StreamStatus.ERROR;
    }

    synchronized AsyncLogWriter setStreamStatus(StreamStatus streamStatus, StreamStatus streamStatus2, AsyncLogWriter asyncLogWriter, Throwable th) {
        if (streamStatus2 != this.status) {
            logger.info("Stream {} status already changed from {} -> {} when trying to change it to {}", new Object[]{this.name, streamStatus2, this.status, streamStatus});
            return null;
        }
        String str = null;
        if (th instanceof OwnershipAcquireFailedException) {
            str = ((OwnershipAcquireFailedException) th).getCurrentOwner();
        }
        AsyncLogWriter asyncLogWriter2 = this.writer;
        this.writer = asyncLogWriter;
        if (null == str || !str.equals(this.clientId)) {
            this.owner = str;
        } else {
            this.unexpectedExceptions.inc();
            logger.error("I am waiting myself {} to release lock on stream {}, so have to shut myself down :", new Object[]{str, this.name, th});
            this.fatalErrorHandler.notifyFatalError();
            this.owner = null;
        }
        this.lastException = th;
        this.status = streamStatus;
        if (StreamStatus.INITIALIZED == streamStatus) {
            this.streamManager.notifyAcquired(this);
            logger.info("Inserted acquired stream {} -> writer {}", this.name, this);
        } else {
            this.streamManager.notifyReleased(this);
            logger.info("Removed acquired stream {} -> writer {}", this.name, this);
        }
        return asyncLogWriter2;
    }

    void close(DistributedLogManager distributedLogManager) {
        if (null != distributedLogManager) {
            try {
                distributedLogManager.close();
            } catch (IOException e) {
                logger.warn("Failed to close dlm for {} : ", this.name, e);
            }
        }
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public Future<Void> requestClose(String str) {
        return requestClose(str, true);
    }

    Future<Void> requestClose(String str, boolean z) {
        this.closeLock.writeLock().lock();
        try {
            if (StreamStatus.CLOSING == this.status || StreamStatus.CLOSED == this.status) {
                Promise<Void> promise = this.closePromise;
                this.closeLock.writeLock().unlock();
                return promise;
            }
            logger.info("Request to close stream {} : {}", getStreamName(), str);
            boolean z2 = StreamStatus.INITIALIZED != this.status;
            this.status = StreamStatus.CLOSING;
            this.streamManager.notifyReleased(this);
            this.closeLock.writeLock().unlock();
            close(z2, z);
            return this.closePromise;
        } catch (Throwable th) {
            this.closeLock.writeLock().unlock();
            throw th;
        }
    }

    @Override // org.apache.distributedlog.service.stream.Stream
    public void delete() throws IOException {
        if (null != this.writer) {
            Utils.close(this.writer);
            synchronized (this) {
                this.writer = null;
                this.lastException = new StreamUnavailableException("Stream was deleted");
            }
        }
        if (null == this.manager) {
            throw new UnexpectedException("No stream " + this.name + " to delete");
        }
        this.manager.delete();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void postClose(boolean z) {
        closeManagerAndErrorOutPendingRequests();
        unregisterGauge();
        if (z) {
            if (null != this.owner) {
                this.streamManager.scheduleRemoval(this, (2 * this.dlConfig.getZKSessionTimeoutMilliseconds()) / 3);
            } else {
                this.streamManager.notifyRemoved(this);
                logger.info("Removed cached stream {}.", getStreamName());
            }
        }
        FutureUtils.setValue(this.closePromise, (Object) null);
    }

    private Future<Void> close(boolean z, final boolean z2) {
        this.closeLock.writeLock().lock();
        try {
            if (StreamStatus.CLOSED == this.status) {
                Promise<Void> promise = this.closePromise;
                this.closeLock.writeLock().unlock();
                return promise;
            }
            boolean z3 = z || !(StreamStatus.INITIALIZED == this.status || StreamStatus.CLOSING == this.status);
            this.status = StreamStatus.CLOSED;
            this.streamManager.notifyReleased(this);
            this.closeLock.writeLock().unlock();
            logger.info("Closing stream {} ...", this.name);
            FutureUtils.stats(z3 ? Abortables.asyncAbort(this.writer, true) : Utils.asyncClose(this.writer, true), this.writerCloseStatLogger, Stopwatch.createStarted()).masked().within(this.futureTimer, this.writerCloseTimeoutMs <= 0 ? Duration.Top() : Duration.fromMilliseconds(this.writerCloseTimeoutMs)).addEventListener(FutureUtils.OrderedFutureEventListener.of(new FutureEventListener<Void>() { // from class: org.apache.distributedlog.service.stream.StreamImpl.8
                public void onSuccess(Void r4) {
                    StreamImpl.this.postClose(z2);
                }

                public void onFailure(Throwable th) {
                    if (th instanceof TimeoutException) {
                        StreamImpl.this.writerCloseTimeoutCounter.inc();
                    }
                    StreamImpl.this.postClose(z2);
                }
            }, this.scheduler, this.name));
            return this.closePromise;
        } catch (Throwable th) {
            this.closeLock.writeLock().unlock();
            throw th;
        }
    }

    private void closeManagerAndErrorOutPendingRequests() {
        Queue<StreamOp> queue;
        close(this.manager);
        synchronized (this) {
            queue = this.pendingOps;
            this.pendingOps = new ArrayDeque();
        }
        Throwable streamUnavailableException = new StreamUnavailableException("Stream " + this.name + " is closed.");
        Iterator<StreamOp> it = queue.iterator();
        while (it.hasNext()) {
            it.next().fail(streamUnavailableException);
            this.pendingOpsCounter.dec();
        }
        this.limiter.close();
        logger.info("Closed stream {}.", this.name);
    }

    private void unregisterGauge() {
        this.streamLogger.unregisterGauge("stream_status", this.streamStatusGauge);
    }

    @VisibleForTesting
    public int numPendingOps() {
        Queue<StreamOp> queue = this.pendingOps;
        if (null == queue) {
            return 0;
        }
        return queue.size();
    }

    @VisibleForTesting
    public StreamStatus getStatus() {
        return this.status;
    }

    @VisibleForTesting
    public void setStatus(StreamStatus streamStatus) {
        this.status = streamStatus;
    }

    @VisibleForTesting
    public AsyncLogWriter getWriter() {
        return this.writer;
    }

    @VisibleForTesting
    public DistributedLogManager getManager() {
        return this.manager;
    }

    @VisibleForTesting
    public Throwable getLastException() {
        return this.lastException;
    }

    @VisibleForTesting
    public Future<Void> getCloseFuture() {
        return this.closePromise;
    }
}
