package akka.stream.alpakka.jms.impl;

import akka.stream.Attributes;
import akka.stream.alpakka.jms.AckEnvelope;
import akka.stream.alpakka.jms.AckEnvelope$;
import akka.stream.alpakka.jms.AcknowledgeMode;
import akka.stream.alpakka.jms.AcknowledgeMode$;
import akka.stream.alpakka.jms.StopMessageListenerException;
import akka.util.OptionVal;
import akka.util.OptionVal$;
import akka.util.OptionVal$Some$;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.runtime.BoxedUnit;
import scala.util.Try;

/* compiled from: JmsAckSourceStage.scala */
/* loaded from: input_file:akka/stream/alpakka/jms/impl/JmsAckSourceStage$$anon$1.class */
public final class JmsAckSourceStage$$anon$1 extends SourceStageLogic<AckEnvelope> {
    private final int akka$stream$alpakka$jms$impl$JmsAckSourceStage$$anon$$maxPendingAck;
    private final /* synthetic */ JmsAckSourceStage $outer;

    public int akka$stream$alpakka$jms$impl$JmsAckSourceStage$$anon$$maxPendingAck() {
        return this.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$anon$$maxPendingAck;
    }

    @Override // akka.stream.alpakka.jms.impl.JmsConsumerConnector, akka.stream.alpakka.jms.impl.JmsConnector
    public JmsConsumerSession createSession(Connection connection, Function1<Session, Destination> function1) {
        Session createSession = connection.createSession(false, ((AcknowledgeMode) this.$outer.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$settings.acknowledgeMode().getOrElse(() -> {
            return AcknowledgeMode$.MODULE$.ClientAcknowledge();
        })).mode());
        return new JmsAckSession(connection, createSession, (Destination) function1.apply(createSession), destination(), this.$outer.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$settings.bufferSize());
    }

    @Override // akka.stream.alpakka.jms.impl.SourceStageLogic
    public void pushMessage(AckEnvelope ackEnvelope) {
        push(this.$outer.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$out(), ackEnvelope);
    }

    @Override // akka.stream.alpakka.jms.impl.SourceStageLogic, akka.stream.alpakka.jms.impl.JmsConnector
    public void onSessionOpened(JmsConsumerSession jmsConsumerSession) {
        if (!(jmsConsumerSession instanceof JmsAckSession)) {
            throw new IllegalArgumentException(new StringBuilder(47).append("Session must be of type JMSAckSession, it is a ").append(jmsConsumerSession.getClass().getName()).toString());
        }
        JmsAckSession jmsAckSession = (JmsAckSession) jmsConsumerSession;
        jmsAckSession.createConsumer(this.$outer.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$settings.selector(), ec()).map(messageConsumer -> {
            $anonfun$onSessionOpened$1(this, jmsAckSession, messageConsumer);
            return BoxedUnit.UNIT;
        }, ec()).onComplete(r4 -> {
            $anonfun$onSessionOpened$2(this, r4);
            return BoxedUnit.UNIT;
        }, ec());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    @Override // akka.stream.alpakka.jms.impl.JmsConnector
    /* renamed from: createSession, reason: avoid collision after fix types in other method */
    public /* bridge */ /* synthetic */ JmsConsumerSession createSession2(Connection connection, Function1 function1) {
        return createSession(connection, (Function1<Session, Destination>) function1);
    }

    @Override // akka.stream.alpakka.jms.impl.JmsConsumerConnector, akka.stream.alpakka.jms.impl.JmsConnector
    public /* bridge */ /* synthetic */ JmsConsumerSession createSession(Connection connection, Function1 function1) {
        return createSession(connection, (Function1<Session, Destination>) function1);
    }

    public static final /* synthetic */ void $anonfun$onSessionOpened$1(final JmsAckSourceStage$$anon$1 jmsAckSourceStage$$anon$1, final JmsAckSession jmsAckSession, MessageConsumer messageConsumer) {
        messageConsumer.setMessageListener(new MessageListener(jmsAckSourceStage$$anon$1, jmsAckSession) { // from class: akka.stream.alpakka.jms.impl.JmsAckSourceStage$$anon$1$$anon$2
            private boolean listenerStopped;
            private final /* synthetic */ JmsAckSourceStage$$anon$1 $outer;
            private final JmsAckSession x2$1;

            private boolean listenerStopped() {
                return this.listenerStopped;
            }

            private void listenerStopped_$eq(boolean z) {
                this.listenerStopped = z;
            }

            public void onMessage(Message message) {
                if (listenerStopped()) {
                    return;
                }
                try {
                    this.$outer.handleMessage().invoke(AckEnvelope$.MODULE$.apply(message, this.x2$1));
                    this.x2$1.pendingAck_$eq(this.x2$1.pendingAck() + 1);
                    if (this.x2$1.pendingAck() > this.$outer.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$anon$$maxPendingAck()) {
                        this.x2$1.ackQueue().take().apply$mcV$sp();
                        this.x2$1.pendingAck_$eq(this.x2$1.pendingAck() - 1);
                    }
                    ackQueued$1();
                } catch (JMSException e) {
                    this.$outer.handleError().invoke(e);
                } catch (StopMessageListenerException unused) {
                    listenerStopped_$eq(true);
                }
            }

            private final void ackQueued$1() {
                do {
                    Function0 function0 = (Function0) OptionVal$.MODULE$.apply(this.x2$1.ackQueue().poll());
                    if (new OptionVal(function0) != null) {
                        Function0 function02 = (Function0) OptionVal$Some$.MODULE$.unapply(function0);
                        if (!OptionVal$.MODULE$.isEmpty$extension(function02)) {
                            try {
                                ((Function0) OptionVal$.MODULE$.get$extension(function02)).apply$mcV$sp();
                                this.x2$1.pendingAck_$eq(this.x2$1.pendingAck() - 1);
                            } catch (StopMessageListenerException unused) {
                                listenerStopped_$eq(true);
                            }
                        }
                    }
                    OptionVal$.MODULE$.None();
                    Object obj = null;
                    if (0 != 0 ? !obj.equals(function0) : function0 != null) {
                        throw new MatchError(new OptionVal(function0));
                    }
                    BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                } while (!listenerStopped());
                BoxedUnit boxedUnit3 = BoxedUnit.UNIT;
                BoxedUnit boxedUnit22 = BoxedUnit.UNIT;
            }

            {
                if (jmsAckSourceStage$$anon$1 == null) {
                    throw null;
                }
                this.$outer = jmsAckSourceStage$$anon$1;
                this.x2$1 = jmsAckSession;
                this.listenerStopped = false;
            }
        });
    }

    public static final /* synthetic */ void $anonfun$onSessionOpened$2(JmsAckSourceStage$$anon$1 jmsAckSourceStage$$anon$1, Try r4) {
        jmsAckSourceStage$$anon$1.sessionOpenedCB().invoke(r4);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public JmsAckSourceStage$$anon$1(JmsAckSourceStage jmsAckSourceStage, Attributes attributes) {
        super(jmsAckSourceStage.m46shape(), jmsAckSourceStage.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$out(), jmsAckSourceStage.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$settings, jmsAckSourceStage.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$destination, attributes);
        if (jmsAckSourceStage == null) {
            throw null;
        }
        this.$outer = jmsAckSourceStage;
        this.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$anon$$maxPendingAck = jmsAckSourceStage.akka$stream$alpakka$jms$impl$JmsAckSourceStage$$settings.bufferSize();
    }
}
