package org.apache.pekko.stream.connectors.jms.impl;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.stream.Attributes;
import org.apache.pekko.stream.Attributes$;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.Outlet$;
import org.apache.pekko.stream.SourceShape;
import org.apache.pekko.stream.SourceShape$;
import org.apache.pekko.stream.connectors.jms.AckEnvelope;
import org.apache.pekko.stream.connectors.jms.AckEnvelope$;
import org.apache.pekko.stream.connectors.jms.AcknowledgeMode;
import org.apache.pekko.stream.connectors.jms.AcknowledgeMode$;
import org.apache.pekko.stream.connectors.jms.Destination;
import org.apache.pekko.stream.connectors.jms.JmsConsumerSettings;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.GraphStageWithMaterializedValue;
import scala.Function1;
import scala.Option;
import scala.Tuple2;
import scala.Tuple2$;
import scala.concurrent.duration.FiniteDuration;

/* compiled from: JmsAckSourceStage.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage.class */
public final class JmsAckSourceStage extends GraphStageWithMaterializedValue<SourceShape<AckEnvelope>, JmsConsumerMatValue> {
    public final JmsConsumerSettings org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$settings;
    public final Destination org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$destination;
    public final Outlet<AckEnvelope> org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$out = Outlet$.MODULE$.apply("JmsSource.out");

    /* compiled from: JmsAckSourceStage.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/jms/impl/JmsAckSourceStage$JmsAckSourceStageLogic.class */
    public final class JmsAckSourceStageLogic extends SourceStageLogic<AckEnvelope> {
        private final int maxPendingAcks;
        private final Option<FiniteDuration> maxAckInterval;
        private final /* synthetic */ JmsAckSourceStage $outer;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        public JmsAckSourceStageLogic(JmsAckSourceStage jmsAckSourceStage, Attributes attributes) {
            super(jmsAckSourceStage.m82shape(), jmsAckSourceStage.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$out, jmsAckSourceStage.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$settings, jmsAckSourceStage.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$destination, attributes);
            if (jmsAckSourceStage == null) {
                throw new NullPointerException();
            }
            this.$outer = jmsAckSourceStage;
            this.maxPendingAcks = jmsAckSourceStage.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$settings.maxPendingAcks();
            this.maxAckInterval = jmsAckSourceStage.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$settings.maxAckInterval();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConsumerConnector, org.apache.pekko.stream.connectors.jms.impl.JmsConnector
        public JmsConsumerSession createSession(Connection connection, Function1<Session, javax.jms.Destination> function1) {
            Session createSession = connection.createSession(false, ((AcknowledgeMode) this.$outer.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$settings.acknowledgeMode().getOrElse(JmsAckSourceStage::org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$JmsAckSourceStageLogic$$_$_$$anonfun$1)).mode());
            return new JmsAckSession(connection, createSession, (javax.jms.Destination) function1.apply(createSession), graphStageDestination(), this.maxPendingAcks);
        }

        @Override // org.apache.pekko.stream.connectors.jms.impl.SourceStageLogic
        public void pushMessage(AckEnvelope ackEnvelope) {
            push(this.$outer.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$out, ackEnvelope);
        }

        @Override // org.apache.pekko.stream.connectors.jms.impl.SourceStageLogic, org.apache.pekko.stream.connectors.jms.impl.JmsConnector
        public void onSessionOpened(JmsConsumerSession jmsConsumerSession) {
            if (!(jmsConsumerSession instanceof JmsAckSession)) {
                throw new IllegalArgumentException(new StringBuilder(47).append("Session must be of type JMSAckSession, it is a ").append(jmsConsumerSession.getClass().getName()).toString());
            }
            JmsAckSession jmsAckSession = (JmsAckSession) jmsConsumerSession;
            this.maxAckInterval.foreach(finiteDuration -> {
                scheduleWithFixedDelay(JmsConnector$FlushAcknowledgementsTimerKey$.MODULE$.apply(jmsAckSession), finiteDuration, finiteDuration);
            });
            jmsAckSession.createConsumer(this.$outer.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$settings.selector(), ec()).map(messageConsumer -> {
                messageConsumer.setMessageListener(message -> {
                    if (jmsAckSession.isListenerRunning()) {
                        try {
                            handleMessage().invoke(AckEnvelope$.MODULE$.apply(message, jmsAckSession));
                            jmsAckSession.pendingAck_$eq(jmsAckSession.pendingAck() + 1);
                            if (jmsAckSession.maxPendingAcksReached()) {
                                jmsAckSession.ackBackpressure();
                            }
                            jmsAckSession.drainAcks();
                        } catch (JMSException e) {
                            handleError().invoke(e);
                        }
                    }
                });
            }, ec()).onComplete(r4 -> {
                sessionOpenedCB().invoke(r4);
            }, ec());
        }

        public final /* synthetic */ JmsAckSourceStage org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$JmsAckSourceStageLogic$$$outer() {
            return this.$outer;
        }

        @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConsumerConnector, org.apache.pekko.stream.connectors.jms.impl.JmsConnector
        /* renamed from: createSession, reason: avoid collision after fix types in other method */
        public /* bridge */ /* synthetic */ JmsConsumerSession createSession2(Connection connection, Function1 function1) {
            return createSession(connection, (Function1<Session, javax.jms.Destination>) function1);
        }

        @Override // org.apache.pekko.stream.connectors.jms.impl.JmsConnector
        public /* bridge */ /* synthetic */ JmsConsumerSession createSession(Connection connection, Function1 function1) {
            return createSession(connection, (Function1<Session, javax.jms.Destination>) function1);
        }
    }

    public JmsAckSourceStage(JmsConsumerSettings jmsConsumerSettings, Destination destination) {
        this.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$settings = jmsConsumerSettings;
        this.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$destination = destination;
    }

    /* renamed from: shape, reason: merged with bridge method [inline-methods] */
    public SourceShape<AckEnvelope> m82shape() {
        return SourceShape$.MODULE$.apply(this.org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$$out);
    }

    public Tuple2<GraphStageLogic, JmsConsumerMatValue> createLogicAndMaterializedValue(Attributes attributes) {
        JmsAckSourceStageLogic jmsAckSourceStageLogic = new JmsAckSourceStageLogic(this, attributes);
        return Tuple2$.MODULE$.apply(jmsAckSourceStageLogic, jmsAckSourceStageLogic.consumerControl());
    }

    public Attributes initialAttributes() {
        return Attributes$.MODULE$.name("JmsAckConsumer");
    }

    public static final AcknowledgeMode org$apache$pekko$stream$connectors$jms$impl$JmsAckSourceStage$JmsAckSourceStageLogic$$_$_$$anonfun$1() {
        return AcknowledgeMode$.MODULE$.ClientAcknowledge();
    }
}
