package org.apache.pekko.stream.impl.streamref;

import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.Terminated;
import org.apache.pekko.actor.Terminated$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.InvalidPartnerActorException$;
import org.apache.pekko.stream.InvalidSequenceNumberException$;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.RemoteStreamRefActorTerminatedException$;
import org.apache.pekko.stream.StreamRefAttributes;
import org.apache.pekko.stream.StreamRefAttributes$BufferCapacity$;
import org.apache.pekko.stream.StreamRefAttributes$DemandRedeliveryInterval$;
import org.apache.pekko.stream.StreamRefAttributes$FinalTerminationSignalDeadline$;
import org.apache.pekko.stream.StreamRefAttributes$SubscriptionTimeout$;
import org.apache.pekko.stream.StreamRefSettings;
import org.apache.pekko.stream.StreamRefSubscriptionTimeoutException$;
import org.apache.pekko.stream.SubscriptionWithCancelException;
import org.apache.pekko.stream.SubscriptionWithCancelException$NoMoreElementsNeeded$;
import org.apache.pekko.stream.impl.FixedSizeBuffer;
import org.apache.pekko.stream.impl.FixedSizeBuffer$;
import org.apache.pekko.stream.impl.streamref.SourceRefStageImpl;
import org.apache.pekko.stream.impl.streamref.StreamRefsProtocol;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import org.apache.pekko.util.OptionVal$;
import org.apache.pekko.util.OptionVal$Some$;
import org.apache.pekko.util.PrettyDuration$;
import scala.Function1;
import scala.MatchError;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

/* compiled from: SourceRefImpl.scala */
/* loaded from: input_file:org/apache/pekko/stream/impl/streamref/SourceRefStageImpl$$anon$1.class */
public final class SourceRefStageImpl$$anon$1 extends TimerGraphStageLogic implements StageLogging, SourceRefStageImpl.ActorRefStage, OutHandler {
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final StreamRefAttributes.SubscriptionTimeout subscriptionTimeout;
    private final FiniteDuration demandRedeliveryInterval;
    private final FiniteDuration finalTerminationSignalDeadline;
    private final String stageActorName;
    private final GraphStageLogic.StageActor self;
    private final ActorRef ref;
    private SourceRefStageImpl.State state;
    private long expectingSeqNr;
    private long localCumulativeDemand;
    private int localRemainingRequested;
    private final FixedSizeBuffer.AbstractC0000FixedSizeBuffer receiveBuffer;
    private final SourceRefStageImpl.WatermarkRequestStrategy requestStrategy;
    private final /* synthetic */ SourceRefStageImpl $outer;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SourceRefStageImpl$$anon$1(Attributes attributes, Materializer materializer, SourceRefStageImpl sourceRefStageImpl) {
        super(sourceRefStageImpl.shape());
        SourceRefStageImpl.State state;
        if (sourceRefStageImpl == null) {
            throw new NullPointerException();
        }
        this.$outer = sourceRefStageImpl;
        StageLogging.$init$(this);
        StreamRefsMaster streamRefsMaster = (StreamRefsMaster) StreamRefsMaster$.MODULE$.apply(materializer.system());
        StreamRefSettings streamRefSettings = materializer.settings().streamRefSettings();
        this.subscriptionTimeout = (StreamRefAttributes.SubscriptionTimeout) attributes.get(StreamRefAttributes$SubscriptionTimeout$.MODULE$.apply(streamRefSettings.subscriptionTimeout()), ClassTag$.MODULE$.apply(StreamRefAttributes.SubscriptionTimeout.class));
        int capacity = ((StreamRefAttributes.BufferCapacity) attributes.get(StreamRefAttributes$BufferCapacity$.MODULE$.apply(streamRefSettings.bufferCapacity()), ClassTag$.MODULE$.apply(StreamRefAttributes.BufferCapacity.class))).capacity();
        this.demandRedeliveryInterval = ((StreamRefAttributes.DemandRedeliveryInterval) attributes.get(StreamRefAttributes$DemandRedeliveryInterval$.MODULE$.apply(streamRefSettings.demandRedeliveryInterval()), ClassTag$.MODULE$.apply(StreamRefAttributes.DemandRedeliveryInterval.class))).timeout();
        this.finalTerminationSignalDeadline = ((StreamRefAttributes.FinalTerminationSignalDeadline) attributes.get(StreamRefAttributes$FinalTerminationSignalDeadline$.MODULE$.apply(streamRefSettings.finalTerminationSignalDeadline()), ClassTag$.MODULE$.apply(StreamRefAttributes.FinalTerminationSignalDeadline.class))).timeout();
        this.stageActorName = streamRefsMaster.nextSourceRefStageName();
        this.self = getEagerStageActor(materializer, receiveRemoteMessage());
        this.ref = this.self.ref();
        ActorRef actorRef = (ActorRef) OptionVal$Some$.MODULE$.unapply(sourceRefStageImpl.initialPartnerRef());
        if (OptionVal$.MODULE$.isEmpty$extension(actorRef)) {
            state = SourceRefStageImpl$AwaitingPartner$.MODULE$;
        } else {
            ActorRef actorRef2 = (ActorRef) OptionVal$.MODULE$.get$extension(actorRef);
            this.self.watch(actorRef2);
            state = SourceRefStageImpl$AwaitingSubscription$.MODULE$.apply(actorRef2);
        }
        this.state = state;
        this.expectingSeqNr = 0L;
        this.localCumulativeDemand = 0L;
        this.localRemainingRequested = 0;
        this.receiveBuffer = FixedSizeBuffer$.MODULE$.apply(capacity);
        this.requestStrategy = SourceRefStageImpl$WatermarkRequestStrategy$.MODULE$.apply(this.receiveBuffer.capacity());
        setHandler(sourceRefStageImpl.out(), this);
    }

    @Override // org.apache.pekko.stream.stage.StageLogging
    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    @Override // org.apache.pekko.stream.stage.StageLogging
    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    @Override // org.apache.pekko.stream.stage.StageLogging
    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        LoggingAdapter log;
        log = log();
        return log;
    }

    @Override // org.apache.pekko.stream.stage.OutHandler
    public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
        onDownstreamFinish();
    }

    @Override // org.apache.pekko.stream.stage.StageLogging
    public Class logSource() {
        return SourceRefStageImpl.class;
    }

    @Override // org.apache.pekko.stream.stage.GraphStageLogic
    public String stageActorName() {
        return this.stageActorName;
    }

    @Override // org.apache.pekko.stream.impl.streamref.SourceRefStageImpl.ActorRefStage
    public ActorRef ref() {
        return this.ref;
    }

    private ActorRef selfSender() {
        return ref();
    }

    @Override // org.apache.pekko.stream.stage.GraphStageLogic
    public void preStart() {
        log().debug("[{}] Starting up with, self ref: {}, state: {}, subscription timeout: {}", stageActorName(), this.self.ref(), this.state, PrettyDuration$.MODULE$.format(this.subscriptionTimeout.timeout()));
        scheduleOnce(SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey(), this.subscriptionTimeout.timeout());
    }

    @Override // org.apache.pekko.stream.stage.OutHandler
    public void onPull() {
        tryPush();
        triggerCumulativeDemand();
    }

    public Function1 receiveRemoteMessage() {
        return tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            ActorRef actorRef = (ActorRef) tuple2._1();
            Object _2 = tuple2._2();
            if (_2 instanceof StreamRefsProtocol.OnSubscribeHandshake) {
                StreamRefsProtocol.OnSubscribeHandshake onSubscribeHandshake = (StreamRefsProtocol.OnSubscribeHandshake) _2;
                ActorRef _1 = StreamRefsProtocol$OnSubscribeHandshake$.MODULE$.unapply(onSubscribeHandshake)._1();
                SourceRefStageImpl.State state = this.state;
                if (SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state)) {
                    cancelTimer(SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey());
                    log().debug("[{}] Received on subscribe handshake {} while awaiting partner from {}", stageActorName(), onSubscribeHandshake, _1);
                    this.state = SourceRefStageImpl$Running$.MODULE$.apply(_1);
                    this.self.watch(_1);
                    triggerCumulativeDemand();
                    return;
                }
                if (!(state instanceof SourceRefStageImpl.AwaitingSubscription)) {
                    throw new IllegalStateException(new StringBuilder(28).append("[").append(stageActorName()).append("] Got unexpected ").append(onSubscribeHandshake).append(" in state ").append(state).toString());
                }
                verifyPartner(actorRef, SourceRefStageImpl$AwaitingSubscription$.MODULE$.unapply((SourceRefStageImpl.AwaitingSubscription) state)._1());
                cancelTimer(SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey());
                log().debug("[{}] Received on subscribe handshake {} while awaiting subscription from {}", stageActorName(), onSubscribeHandshake, _1);
                this.state = SourceRefStageImpl$Running$.MODULE$.apply(_1);
                triggerCumulativeDemand();
                return;
            }
            if (_2 instanceof StreamRefsProtocol.SequencedOnNext) {
                StreamRefsProtocol.SequencedOnNext sequencedOnNext = (StreamRefsProtocol.SequencedOnNext) _2;
                StreamRefsProtocol.SequencedOnNext unapply = StreamRefsProtocol$SequencedOnNext$.MODULE$.unapply(sequencedOnNext);
                long _12 = unapply._1();
                Object _22 = unapply._2();
                if (_22 instanceof Object) {
                    observeAndValidateSequenceNr(_12, "Illegal sequence nr in SequencedOnNext");
                    SourceRefStageImpl.State state2 = this.state;
                    if (state2 instanceof SourceRefStageImpl.AwaitingSubscription) {
                        ActorRef _13 = SourceRefStageImpl$AwaitingSubscription$.MODULE$.unapply((SourceRefStageImpl.AwaitingSubscription) state2)._1();
                        verifyPartner(actorRef, _13);
                        log().debug("[{}] Received seq {} from {}", stageActorName(), sequencedOnNext, actorRef);
                        this.state = SourceRefStageImpl$Running$.MODULE$.apply(_13);
                        onReceiveElement(_22);
                        triggerCumulativeDemand();
                        return;
                    }
                    if (state2 instanceof SourceRefStageImpl.Running) {
                        verifyPartner(actorRef, SourceRefStageImpl$Running$.MODULE$.unapply((SourceRefStageImpl.Running) state2)._1());
                        onReceiveElement(_22);
                        triggerCumulativeDemand();
                        return;
                    }
                    if (SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state2)) {
                        throw new IllegalStateException(new StringBuilder(35).append("[").append(stageActorName()).append("] Got ").append(sequencedOnNext).append(" from ").append(actorRef).append(" while AwaitingPartner").toString());
                    }
                    if (state2 instanceof SourceRefStageImpl.WaitingForCancelAck) {
                        SourceRefStageImpl.WaitingForCancelAck unapply2 = SourceRefStageImpl$WaitingForCancelAck$.MODULE$.unapply((SourceRefStageImpl.WaitingForCancelAck) state2);
                        ActorRef _14 = unapply2._1();
                        unapply2._2();
                        verifyPartner(actorRef, _14);
                        log().warning("[{}] Got element from remote but downstream cancelled, dropping element of type {}", stageActorName(), _22.getClass());
                        return;
                    }
                    if (state2 instanceof SourceRefStageImpl.UpstreamCompleted) {
                        verifyPartner(actorRef, SourceRefStageImpl$UpstreamCompleted$.MODULE$.unapply((SourceRefStageImpl.UpstreamCompleted) state2)._1());
                        throw new IllegalStateException(new StringBuilder(88).append("[").append(stageActorName()).append("] Got completion and then received more elements from ").append(actorRef).append(", this is not supposed to happen.").toString());
                    }
                    if (!(state2 instanceof SourceRefStageImpl.UpstreamTerminated)) {
                        throw new MatchError(state2);
                    }
                    verifyPartner(actorRef, SourceRefStageImpl$UpstreamTerminated$.MODULE$.unapply((SourceRefStageImpl.UpstreamTerminated) state2)._1());
                    log().debug("[{}] Received element after partner terminated");
                    onReceiveElement(_22);
                    return;
                }
            }
            if (_2 instanceof StreamRefsProtocol.RemoteStreamCompleted) {
                long _15 = StreamRefsProtocol$RemoteStreamCompleted$.MODULE$.unapply((StreamRefsProtocol.RemoteStreamCompleted) _2)._1();
                observeAndValidateSequenceNr(_15, "Illegal sequence nr in RemoteSinkCompleted");
                SourceRefStageImpl.State state3 = this.state;
                if (state3 instanceof SourceRefStageImpl.Running) {
                    ActorRef _16 = SourceRefStageImpl$Running$.MODULE$.unapply((SourceRefStageImpl.Running) state3)._1();
                    verifyPartner(actorRef, _16);
                    log().debug("[{}] The remote stream has completed, emitting {} elements left in buffer before completing", stageActorName(), BoxesRunTime.boxToInteger(this.receiveBuffer.used()));
                    this.self.unwatch(actorRef);
                    this.state = SourceRefStageImpl$UpstreamCompleted$.MODULE$.apply(_16);
                    tryPush();
                    return;
                }
                if (!(state3 instanceof SourceRefStageImpl.WaitingForCancelAck)) {
                    throw new IllegalStateException(new StringBuilder(67).append("[").append(stageActorName()).append("] Saw RemoteStreamCompleted(").append(_15).append(") while in state ").append(state3).append(", should never happen").toString());
                }
                SourceRefStageImpl.WaitingForCancelAck unapply3 = SourceRefStageImpl$WaitingForCancelAck$.MODULE$.unapply((SourceRefStageImpl.WaitingForCancelAck) state3);
                unapply3._1();
                unapply3._2();
                log().debug("[{}] Upstream completed while waiting for cancel ack", stageActorName());
                return;
            }
            if (_2 instanceof StreamRefsProtocol.RemoteStreamFailure) {
                String _17 = StreamRefsProtocol$RemoteStreamFailure$.MODULE$.unapply((StreamRefsProtocol.RemoteStreamFailure) _2)._1();
                SourceRefStageImpl.State state4 = this.state;
                if (!(state4 instanceof SourceRefStageImpl.WeKnowPartner)) {
                    throw new IllegalStateException(new StringBuilder(64).append("[").append(stageActorName()).append("] got RemoteStreamFailure(").append(_17).append(") when in state ").append(state4).append(", should never happen").toString());
                }
                verifyPartner(actorRef, ((SourceRefStageImpl.WeKnowPartner) state4).partner());
                log().debug("[{}] The remote stream has failed, failing (reason: {})", stageActorName(), _17);
                failStage(RemoteStreamRefActorTerminatedException$.MODULE$.apply(new StringBuilder(36).append("[").append(stageActorName()).append("] Remote stream (").append(actorRef.path()).append(") failed, reason: ").append(_17).toString()));
                return;
            }
            if (StreamRefsProtocol$Ack$.MODULE$.equals(_2)) {
                SourceRefStageImpl.State state5 = this.state;
                if (!(state5 instanceof SourceRefStageImpl.WaitingForCancelAck)) {
                    throw new IllegalStateException(new StringBuilder(28).append("[").append(stageActorName()).append("] Got an Ack when in state ").append(state5).toString());
                }
                SourceRefStageImpl.WaitingForCancelAck unapply4 = SourceRefStageImpl$WaitingForCancelAck$.MODULE$.unapply((SourceRefStageImpl.WaitingForCancelAck) state5);
                ActorRef _18 = unapply4._1();
                Throwable _23 = unapply4._2();
                verifyPartner(actorRef, _18);
                log().debug("[{}] Got cancellation ack from remote, canceling", stageActorName());
                cancelStage(_23);
                return;
            }
            if (!(_2 instanceof Terminated)) {
                throw new IllegalStateException(new StringBuilder(39).append("[").append(stageActorName()).append("] Unexpected message in state ").append(this.state).append(": ").append(_2).append(" from ").append(actorRef).toString());
            }
            ActorRef _19 = Terminated$.MODULE$.unapply((Terminated) _2)._1();
            SourceRefStageImpl.State state6 = this.state;
            if (!(state6 instanceof SourceRefStageImpl.WeKnowPartner)) {
                throw new IllegalStateException(new StringBuilder(72).append("[").append(stageActorName()).append("] Unexpected deathwatch message for ").append(_19).append(" before we knew partner ref, state ").append(state6).toString());
            }
            SourceRefStageImpl.WeKnowPartner weKnowPartner = (SourceRefStageImpl.WeKnowPartner) state6;
            ActorRef partner = weKnowPartner.partner();
            if (partner != null ? !partner.equals(_19) : _19 != null) {
                throw RemoteStreamRefActorTerminatedException$.MODULE$.apply(new StringBuilder(45).append("[").append(stageActorName()).append("] Received UNEXPECTED Terminated(").append(_19).append(") message! ").append(new StringBuilder(73).append("This actor was NOT our trusted remote partner, which was: ").append(weKnowPartner.partner()).append(". Tearing down.").toString()).toString());
            }
            scheduleOnce(SourceRefStageImpl$.MODULE$.TerminationDeadlineTimerKey(), this.finalTerminationSignalDeadline);
            log().debug("[{}] Partner terminated, starting delayed shutdown, deadline: [{}]", stageActorName(), this.finalTerminationSignalDeadline);
            this.state = SourceRefStageImpl$UpstreamTerminated$.MODULE$.apply(weKnowPartner.partner());
        };
    }

    @Override // org.apache.pekko.stream.stage.TimerGraphStageLogic
    public void onTimer(Object obj) {
        String SubscriptionTimeoutTimerKey = SourceRefStageImpl$.MODULE$.SubscriptionTimeoutTimerKey();
        if (SubscriptionTimeoutTimerKey != null ? SubscriptionTimeoutTimerKey.equals(obj) : obj == null) {
            SourceRefStageImpl.State state = this.state;
            if (!SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state)) {
                if (!(state instanceof SourceRefStageImpl.AwaitingSubscription)) {
                    log().debug("[{}] Ignoring subscription timeout in state [{}]", stageActorName(), state);
                    return;
                }
                SourceRefStageImpl$AwaitingSubscription$.MODULE$.unapply((SourceRefStageImpl.AwaitingSubscription) state)._1();
            }
            throw StreamRefSubscriptionTimeoutException$.MODULE$.apply(new StringBuilder(76).append("[").append(stageActorName()).append("] Remote side did not subscribe (materialize) handed out Sink reference [").append(ref()).append("],").append(new StringBuilder(30).append("within subscription timeout: ").append(PrettyDuration$.MODULE$.format(this.subscriptionTimeout.timeout())).append("!").toString()).toString());
        }
        String DemandRedeliveryTimerKey = SourceRefStageImpl$.MODULE$.DemandRedeliveryTimerKey();
        if (DemandRedeliveryTimerKey != null ? DemandRedeliveryTimerKey.equals(obj) : obj == null) {
            SourceRefStageImpl.State state2 = this.state;
            if (!(state2 instanceof SourceRefStageImpl.Running)) {
                log().debug("[{}] Ignoring demand redelivery timeout in state [{}]", stageActorName(), state2);
                return;
            }
            ActorRef _1 = SourceRefStageImpl$Running$.MODULE$.unapply((SourceRefStageImpl.Running) state2)._1();
            log().debug("[{}] Scheduled re-delivery of demand until [{}]", stageActorName(), BoxesRunTime.boxToLong(this.localCumulativeDemand));
            _1.$bang(StreamRefsProtocol$CumulativeDemand$.MODULE$.apply(this.localCumulativeDemand), selfSender());
            scheduleDemandRedelivery();
            return;
        }
        String TerminationDeadlineTimerKey = SourceRefStageImpl$.MODULE$.TerminationDeadlineTimerKey();
        if (TerminationDeadlineTimerKey != null ? TerminationDeadlineTimerKey.equals(obj) : obj == null) {
            SourceRefStageImpl.State state3 = this.state;
            if (state3 instanceof SourceRefStageImpl.UpstreamTerminated) {
                ActorRef _12 = SourceRefStageImpl$UpstreamTerminated$.MODULE$.unapply((SourceRefStageImpl.UpstreamTerminated) state3)._1();
                log().debug("[{}] Remote partner [{}] has terminated unexpectedly and no clean completion/failure message was received", stageActorName(), _12);
                failStage(RemoteStreamRefActorTerminatedException$.MODULE$.apply(new StringBuilder(211).append("[").append(stageActorName()).append("] Remote partner [").append(_12).append("] has terminated unexpectedly and no clean completion/failure message was received ").append("(possible reasons: network partition or subscription timeout triggered termination of partner). Tearing down.").toString()));
                return;
            } else {
                if (!SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state3)) {
                    throw new IllegalStateException(new StringBuilder(50).append("TerminationDeadlineTimerKey can't happen in state ").append(state3).toString());
                }
                log().debug("[{}] Downstream cancelled, but timeout hit before we saw a partner", stageActorName());
                cancelStage(SubscriptionWithCancelException$NoMoreElementsNeeded$.MODULE$);
                return;
            }
        }
        String CancellationDeadlineTimerKey = SourceRefStageImpl$.MODULE$.CancellationDeadlineTimerKey();
        if (CancellationDeadlineTimerKey != null ? !CancellationDeadlineTimerKey.equals(obj) : obj != null) {
            throw new IllegalArgumentException(new StringBuilder(19).append("Unknown timer key: ").append(obj).toString());
        }
        SourceRefStageImpl.State state4 = this.state;
        if (!(state4 instanceof SourceRefStageImpl.WaitingForCancelAck)) {
            throw new IllegalStateException(new StringBuilder(54).append("[").append(stageActorName()).append("] CancellationDeadlineTimerKey can't happen in state ").append(state4).toString());
        }
        SourceRefStageImpl.WaitingForCancelAck unapply = SourceRefStageImpl$WaitingForCancelAck$.MODULE$.unapply((SourceRefStageImpl.WaitingForCancelAck) state4);
        ActorRef _13 = unapply._1();
        Throwable _2 = unapply._2();
        log().debug("[{}] Waiting for remote ack from [{}] for downstream failure timed out, failing stage with original downstream failure", stageActorName(), _13);
        cancelStage(_2);
    }

    @Override // org.apache.pekko.stream.stage.OutHandler
    public void onDownstreamFinish(Throwable th) {
        SourceRefStageImpl.State state = this.state;
        if (state instanceof SourceRefStageImpl.Running) {
            triggerCancellationExchange(SourceRefStageImpl$Running$.MODULE$.unapply((SourceRefStageImpl.Running) state)._1(), th);
            return;
        }
        if (SourceRefStageImpl$AwaitingPartner$.MODULE$.equals(state)) {
            scheduleOnce(SourceRefStageImpl$.MODULE$.TerminationDeadlineTimerKey(), this.finalTerminationSignalDeadline);
            return;
        }
        if (state instanceof SourceRefStageImpl.AwaitingSubscription) {
            triggerCancellationExchange(SourceRefStageImpl$AwaitingSubscription$.MODULE$.unapply((SourceRefStageImpl.AwaitingSubscription) state)._1(), th);
            return;
        }
        if (state instanceof SourceRefStageImpl.UpstreamCompleted) {
            SourceRefStageImpl$UpstreamCompleted$.MODULE$.unapply((SourceRefStageImpl.UpstreamCompleted) state)._1();
            if (this.receiveBuffer.nonEmpty()) {
                log().debug("[{}] Downstream cancelled with elements [{}] in buffer, dropping elements", stageActorName(), BoxesRunTime.boxToInteger(this.receiveBuffer.used()));
            }
            if (th instanceof SubscriptionWithCancelException.NonFailureCancellation) {
                completeStage();
                return;
            } else {
                failStage(th);
                return;
            }
        }
        if (state instanceof SourceRefStageImpl.WaitingForCancelAck) {
            SourceRefStageImpl.WaitingForCancelAck unapply = SourceRefStageImpl$WaitingForCancelAck$.MODULE$.unapply((SourceRefStageImpl.WaitingForCancelAck) state);
            unapply._1();
            unapply._2();
            throw new UnsupportedOperationException(new StringBuilder(54).append("[").append(stageActorName()).append("] Didn't expect state ").append(this.state).append(" when downstream finished with ").append(th).toString());
        }
        if (!(state instanceof SourceRefStageImpl.UpstreamTerminated)) {
            throw new MatchError(state);
        }
        SourceRefStageImpl$UpstreamTerminated$.MODULE$.unapply((SourceRefStageImpl.UpstreamTerminated) state)._1();
        log().debug("[{}] Downstream cancelled with elements [{}] in buffer", stageActorName(), BoxesRunTime.boxToInteger(this.receiveBuffer.used()));
        if (this.receiveBuffer.isEmpty()) {
            failStage(RemoteStreamRefActorTerminatedException$.MODULE$.apply(new StringBuilder(26).append("[").append(stageActorName()).append("] unexpectedly terminated").toString()));
        } else {
            tryPush();
        }
    }

    private void triggerCancellationExchange(ActorRef actorRef, Throwable th) {
        StreamRefsProtocol apply;
        if (this.receiveBuffer.nonEmpty()) {
            log().debug("Downstream cancelled with elements [{}] in buffer, dropping elements", BoxesRunTime.boxToInteger(this.receiveBuffer.used()));
        }
        if (th instanceof SubscriptionWithCancelException.NonFailureCancellation) {
            log().debug("[{}] Deferred stop on downstream cancel", stageActorName());
            apply = StreamRefsProtocol$RemoteStreamCompleted$.MODULE$.apply(this.expectingSeqNr);
        } else {
            log().debug("[{}] Deferred stop on downstream failure: {}", stageActorName(), th);
            apply = StreamRefsProtocol$RemoteStreamFailure$.MODULE$.apply("Downstream failed");
        }
        this.self.unwatch(actorRef);
        actorRef.$bang(apply, selfSender());
        this.state = SourceRefStageImpl$WaitingForCancelAck$.MODULE$.apply(actorRef, th);
        scheduleOnce(SourceRefStageImpl$.MODULE$.CancellationDeadlineTimerKey(), this.subscriptionTimeout.timeout());
        setKeepGoing(true);
    }

    public void triggerCumulativeDemand() {
        int requestDemand;
        if (this.receiveBuffer.remainingCapacity() - this.localRemainingRequested <= 0 || (requestDemand = this.requestStrategy.requestDemand(this.receiveBuffer.used() + this.localRemainingRequested)) <= 0) {
            return;
        }
        SourceRefStageImpl.State state = this.state;
        if (state instanceof SourceRefStageImpl.Running) {
            ActorRef _1 = SourceRefStageImpl$Running$.MODULE$.unapply((SourceRefStageImpl.Running) state)._1();
            log().debug("[{}] Demanding until [{}] (+{})", stageActorName(), BoxesRunTime.boxToLong(this.localCumulativeDemand), BoxesRunTime.boxToInteger(requestDemand));
            sendDemand$1(requestDemand, _1);
        } else {
            if (!(state instanceof SourceRefStageImpl.AwaitingSubscription)) {
                log().debug("[{}] Partner ref not set up in state {}, demanding elements deferred", stageActorName(), state);
                return;
            }
            ActorRef _12 = SourceRefStageImpl$AwaitingSubscription$.MODULE$.unapply((SourceRefStageImpl.AwaitingSubscription) state)._1();
            log().debug("[{}] Demanding, before subscription seen, until [{}] (+{})", stageActorName(), BoxesRunTime.boxToLong(this.localCumulativeDemand), BoxesRunTime.boxToInteger(requestDemand));
            sendDemand$1(requestDemand, _12);
        }
    }

    private void tryPush() {
        if (this.receiveBuffer.nonEmpty() && isAvailable(this.$outer.out())) {
            push(this.$outer.out(), this.receiveBuffer.dequeue());
        } else if (this.receiveBuffer.isEmpty()) {
            SourceRefStageImpl.State state = this.state;
            if (state instanceof SourceRefStageImpl.UpstreamCompleted) {
                SourceRefStageImpl$UpstreamCompleted$.MODULE$.unapply((SourceRefStageImpl.UpstreamCompleted) state)._1();
                completeStage();
            }
        }
    }

    private void onReceiveElement(Object obj) {
        this.localRemainingRequested--;
        if (this.receiveBuffer.isEmpty() && isAvailable(this.$outer.out())) {
            push(this.$outer.out(), obj);
        } else {
            if (this.receiveBuffer.isFull()) {
                throw new IllegalStateException(new StringBuilder(30).append("Attempted to overflow buffer! ").append(new StringBuilder(32).append("Capacity: ").append(this.receiveBuffer.capacity()).append(", incoming element: ").append(obj).append(", ").toString()).append(new StringBuilder(50).append("localRemainingRequested: ").append(this.localRemainingRequested).append(", localCumulativeDemand: ").append(this.localCumulativeDemand).toString()).toString());
            }
            this.receiveBuffer.enqueue(obj);
        }
    }

    private void verifyPartner(ActorRef actorRef, ActorRef actorRef2) {
        if (actorRef == null) {
            if (actorRef2 == null) {
                return;
            }
        } else if (actorRef.equals(actorRef2)) {
            return;
        }
        throw InvalidPartnerActorException$.MODULE$.apply(actorRef2, actorRef, new StringBuilder(47).append("[").append(stageActorName()).append("] Received message from UNEXPECTED sender [").append(actorRef).append("]! ").append(new StringBuilder(72).append("This actor is NOT our trusted remote partner, which is [").append(actorRef2).append("]. Tearing down.").toString()).toString());
    }

    private void observeAndValidateSequenceNr(long j, String str) {
        if (isInvalidSequenceNr(j)) {
            log().warning("[{}] {}, expected {} but was {}", stageActorName(), str, BoxesRunTime.boxToLong(this.expectingSeqNr), BoxesRunTime.boxToLong(j));
            throw InvalidSequenceNumberException$.MODULE$.apply(this.expectingSeqNr, j, str);
        }
        this.expectingSeqNr++;
    }

    private boolean isInvalidSequenceNr(long j) {
        return j != this.expectingSeqNr;
    }

    private void scheduleDemandRedelivery() {
        scheduleOnce(SourceRefStageImpl$.MODULE$.DemandRedeliveryTimerKey(), this.demandRedeliveryInterval);
    }

    private final void sendDemand$1(int i, ActorRef actorRef) {
        this.localCumulativeDemand += i;
        this.localRemainingRequested += i;
        actorRef.$bang(StreamRefsProtocol$CumulativeDemand$.MODULE$.apply(this.localCumulativeDemand), selfSender());
        scheduleDemandRedelivery();
    }
}
