package org.apache.pekko.stream.connectors.jms.impl;

import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import org.apache.pekko.Done$;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Graph;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.connectors.jms.Destination;
import org.apache.pekko.stream.connectors.jms.JmsConsumerSettings;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.apache.pekko.stream.stage.TimerGraphStageLogic;
import scala.Function1;
import scala.Predef$;
import scala.collection.immutable.Seq;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.util.Failure$;
import scala.util.Success$;

/* compiled from: SourceStageLogic.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/jms/impl/SourceStageLogic.class */
public abstract class SourceStageLogic<T> extends TimerGraphStageLogic implements JmsConsumerConnector, GraphStageCompanion, StageLogging, StageLogging {
    private ExecutionContext ec;
    private Seq org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$jmsSessions;
    private AsyncCallback fail;
    private AsyncCallback org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionFailedCB;
    private SourceQueueWithComplete org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateQueue;
    private Promise org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateSourcePromise;
    private Future connectionStateSource;
    private InternalConnectionState org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionState;
    private AsyncCallback org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$onSession;
    private Function1 sessionOpened;
    private AsyncCallback sessionOpenedCB;
    private boolean startConnection;
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    private final Outlet<T> out;
    private final JmsConsumerSettings settings;
    private final Destination destination;
    private final Attributes inheritedAttributes;
    public final Queue<T> org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue;
    private final AtomicBoolean stopping;
    public boolean org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopped;
    private final AsyncCallback<Done$> markStopped;
    private final AsyncCallback<Throwable> markAborted;
    private final AsyncCallback handleError;
    private final AsyncCallback handleMessage;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public SourceStageLogic(SourceShape<T> sourceShape, Outlet<T> outlet, JmsConsumerSettings jmsConsumerSettings, Destination destination, Attributes attributes) {
        super(sourceShape);
        this.out = outlet;
        this.settings = jmsConsumerSettings;
        this.destination = destination;
        this.inheritedAttributes = attributes;
        JmsConnector.$init$(this);
        JmsConsumerConnector.$init$((JmsConsumerConnector) this);
        StageLogging.$init$(this);
        this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue = (Queue) Queue$.MODULE$.apply(ScalaRunTime$.MODULE$.genericWrapArray(new Object[0]));
        this.stopping = new AtomicBoolean(false);
        this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopped = false;
        this.markStopped = getAsyncCallback(done$ -> {
            this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopped = true;
            finishStop();
            if (this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.isEmpty()) {
                completeStage();
            }
        });
        this.markAborted = getAsyncCallback(th -> {
            this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopped = true;
            finishStop();
            failStage(th);
        });
        this.handleError = getAsyncCallback(th2 -> {
            updateState(InternalConnectionState$JmsConnectorStopping$.MODULE$.apply(Failure$.MODULE$.apply(th2)));
            failStage(th2);
        });
        this.handleMessage = getAsyncCallback(obj -> {
            if (!isAvailable(outlet)) {
                this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.enqueue(obj);
            } else if (this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.isEmpty()) {
                pushMessage(obj);
            } else {
                pushMessage(this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.dequeue());
                this.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.enqueue(obj);
            }
        });
        setHandler(outlet, new OutHandler(this) { // from class: org.apache.pekko.stream.connectors.jms.impl.SourceStageLogic$$anon$1
            private final /* synthetic */ SourceStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                if (this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.nonEmpty()) {
                    this.$outer.pushMessage(this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.dequeue());
                }
                if (this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopped && this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.isEmpty()) {
                    this.$outer.completeStage();
                }
            }

            public void onDownstreamFinish(Throwable th3) {
                this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$queue.clear();
                this.$outer.protected$setKeepGoing(true);
                this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopSessions();
            }
        });
        Statics.releaseFence();
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public ExecutionContext ec() {
        return this.ec;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public Seq<JmsConsumerSession> org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$jmsSessions() {
        return this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$jmsSessions;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public AsyncCallback fail() {
        return this.fail;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public AsyncCallback org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionFailedCB() {
        return this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionFailedCB;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public SourceQueueWithComplete org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateQueue() {
        return this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateQueue;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public Promise org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateSourcePromise() {
        return this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateSourcePromise;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public Future connectionStateSource() {
        return this.connectionStateSource;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public InternalConnectionState org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionState() {
        return this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionState;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public AsyncCallback<JmsConsumerSession> org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$onSession() {
        return this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$onSession;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public Function1 sessionOpened() {
        return this.sessionOpened;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public AsyncCallback sessionOpenedCB() {
        return this.sessionOpenedCB;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void ec_$eq(ExecutionContext executionContext) {
        this.ec = executionContext;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$jmsSessions_$eq(Seq<JmsConsumerSession> seq) {
        this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$jmsSessions = seq;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateQueue_$eq(SourceQueueWithComplete sourceQueueWithComplete) {
        this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateQueue = sourceQueueWithComplete;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionState_$eq(InternalConnectionState internalConnectionState) {
        this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionState = internalConnectionState;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$_setter_$fail_$eq(AsyncCallback asyncCallback) {
        this.fail = asyncCallback;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$_setter_$org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionFailedCB_$eq(AsyncCallback asyncCallback) {
        this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionFailedCB = asyncCallback;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$_setter_$org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateSourcePromise_$eq(Promise promise) {
        this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$connectionStateSourcePromise = promise;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$_setter_$connectionStateSource_$eq(Future future) {
        this.connectionStateSource = future;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$_setter_$org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$onSession_$eq(AsyncCallback asyncCallback) {
        this.org$apache$pekko$stream$connectors$jms$impl$JmsConnector$$onSession = asyncCallback;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$_setter_$sessionOpened_$eq(Function1 function1) {
        this.sessionOpened = function1;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConnector$_setter_$sessionOpenedCB_$eq(AsyncCallback asyncCallback) {
        this.sessionOpenedCB = asyncCallback;
    }

    public /* bridge */ /* synthetic */ void onSessionOpened(JmsSession jmsSession) {
        onSessionOpened(jmsSession);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ void finishStop() {
        finishStop();
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ void publishAndFailStage(Throwable th) {
        publishAndFailStage(th);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ InternalConnectionState updateState(InternalConnectionState internalConnectionState) {
        InternalConnectionState updateState;
        updateState = updateState(internalConnectionState);
        return updateState;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ void connectionFailed(Throwable th) {
        connectionFailed(th);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ void onTimer(Object obj) {
        onTimer(obj);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ ExecutionContext executionContext(Attributes attributes) {
        ExecutionContext executionContext;
        executionContext = executionContext(attributes);
        return executionContext;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ void initSessionAsync(int i, boolean z) {
        initSessionAsync(i, z);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ int initSessionAsync$default$1() {
        int initSessionAsync$default$1;
        initSessionAsync$default$1 = initSessionAsync$default$1();
        return initSessionAsync$default$1;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ boolean initSessionAsync$default$2() {
        boolean initSessionAsync$default$2;
        initSessionAsync$default$2 = initSessionAsync$default$2();
        return initSessionAsync$default$2;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ void closeConnection(Connection connection) {
        closeConnection(connection);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ Future closeConnectionAsync(Future future) {
        Future closeConnectionAsync;
        closeConnectionAsync = closeConnectionAsync(future);
        return closeConnectionAsync;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ void closeSessions() {
        closeSessions();
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ Future closeSessionsAsync() {
        Future closeSessionsAsync;
        closeSessionsAsync = closeSessionsAsync();
        return closeSessionsAsync;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ Future abortSessionsAsync() {
        Future abortSessionsAsync;
        abortSessionsAsync = abortSessionsAsync();
        return abortSessionsAsync;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConsumerConnector, org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public boolean startConnection() {
        return this.startConnection;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConsumerConnector
    public void org$apache$pekko$stream$connectors$jms$impl$JmsConsumerConnector$_setter_$startConnection_$eq(boolean z) {
        this.startConnection = z;
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public Destination destination() {
        return this.destination;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.GraphStageCompanion
    public final Materializer graphStageMaterializer() {
        return materializer();
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.GraphStageCompanion
    public final Destination graphStageDestination() {
        return destination();
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.GraphStageCompanion
    public final void scheduleOnceOnGraphStage(Object obj, FiniteDuration finiteDuration) {
        scheduleOnce(obj, finiteDuration);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.GraphStageCompanion
    public final boolean isTimerActiveOnGraphStage(Object obj) {
        return isTimerActive(obj);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.GraphStageCompanion
    public final void cancelTimerOnGraphStage(Object obj) {
        cancelTimer(obj);
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public JmsConsumerSettings jmsSettings() {
        return this.settings;
    }

    public AsyncCallback<Throwable> handleError() {
        return this.handleError;
    }

    @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
    public void preStart() {
        ec_$eq(executionContext(this.inheritedAttributes));
        preStart();
        initSessionAsync(initSessionAsync$default$1(), initSessionAsync$default$2());
    }

    public AsyncCallback<T> handleMessage() {
        return this.handleMessage;
    }

    public abstract void pushMessage(T t);

    public void org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopSessions() {
        if (this.stopping.compareAndSet(false, true)) {
            Future future = (Future) JmsConnector$.MODULE$.connection().apply(updateState(InternalConnectionState$JmsConnectorStopping$.MODULE$.apply(Success$.MODULE$.apply(Done$.MODULE$))));
            closeSessionsAsync().onComplete(r6 -> {
                closeConnectionAsync(future).onComplete(r4 -> {
                    this.markStopped.invoke(Done$.MODULE$);
                }, ec());
            }, ec());
        }
    }

    public void org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$abortSessions(Throwable th) {
        if (this.stopping.compareAndSet(false, true)) {
            if (log().isDebugEnabled()) {
                log().debug("aborting sessions ({})", th.toString());
            }
            Future future = (Future) JmsConnector$.MODULE$.connection().apply(updateState(InternalConnectionState$JmsConnectorStopping$.MODULE$.apply(Failure$.MODULE$.apply(th))));
            abortSessionsAsync().onComplete(r7 -> {
                closeConnectionAsync(future).onComplete(r5 -> {
                    this.markAborted.invoke(th);
                }, ec());
            }, ec());
        }
    }

    public JmsConsumerMatValue consumerControl() {
        return new JmsConsumerMatValue(this) { // from class: org.apache.pekko.stream.connectors.jms.impl.SourceStageLogic$$anon$2
            private final /* synthetic */ SourceStageLogic $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public void shutdown() {
                this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$stopSessions();
            }

            public void abort(Throwable th) {
                this.$outer.org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$abortSessions(th);
            }

            @Override // org.apache.pekko.stream.connectors.jms.impl.JmsProducerMatValue
            public Source connected() {
                return Source$.MODULE$.future(this.$outer.connectionStateSource()).flatMapConcat(SourceStageLogic::org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$anon$2$$_$connected$$anonfun$1);
            }
        };
    }

    public void postStop() {
        finishStop();
    }

    public void protected$setKeepGoing(boolean z) {
        setKeepGoing(z);
    }

    public static final /* synthetic */ Graph org$apache$pekko$stream$connectors$jms$impl$SourceStageLogic$$anon$2$$_$connected$$anonfun$1(Source source) {
        return (Graph) Predef$.MODULE$.identity(source);
    }
}
