package akka.stream.alpakka.amqp.impl;

import akka.Done;
import akka.Done$;
import akka.stream.alpakka.amqp.AmqpSourceSettings;
import akka.stream.alpakka.amqp.NamedQueueSourceSettings;
import akka.stream.alpakka.amqp.TemporaryQueueSourceSettings;
import akka.stream.alpakka.amqp.scaladsl.CommittableIncomingMessage;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.ShutdownListener;
import java.util.Map;
import scala.MatchError;
import scala.Predef$;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.runtime.BoxedUnit;
import scala.util.Success;

/* compiled from: AmqpSourceStage.scala */
/* loaded from: input_file:akka/stream/alpakka/amqp/impl/AmqpSourceStage$$anon$1.class */
public final class AmqpSourceStage$$anon$1 extends GraphStageLogic implements AmqpConnectorLogic {
    private final AmqpSourceSettings settings;
    private final Queue<CommittableIncomingMessage> akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue;
    private int akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages;
    private Connection akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection;
    private Channel channel;
    private AsyncCallback<Throwable> shutdownCallback;
    private ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
    private volatile byte bitmap$0;
    private final /* synthetic */ AmqpSourceStage $outer;

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void onFailure(Throwable th) {
        onFailure(th);
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public final void preStart() {
        preStart();
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void postStop() {
        postStop();
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public Connection akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection() {
        return this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection_$eq(Connection connection) {
        this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$connection = connection;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public Channel channel() {
        return this.channel;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void channel_$eq(Channel channel) {
        this.channel = channel;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [akka.stream.alpakka.amqp.impl.AmqpSourceStage$$anon$1] */
    private AsyncCallback<Throwable> shutdownCallback$lzycompute() {
        AsyncCallback<Throwable> shutdownCallback;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 1)) == 0) {
                shutdownCallback = shutdownCallback();
                this.shutdownCallback = shutdownCallback;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 1);
            }
        }
        return this.shutdownCallback;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public AsyncCallback<Throwable> shutdownCallback() {
        return ((byte) (this.bitmap$0 & 1)) == 0 ? shutdownCallback$lzycompute() : this.shutdownCallback;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v10, types: [akka.stream.alpakka.amqp.impl.AmqpSourceStage$$anon$1] */
    private ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzycompute() {
        ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
        ?? r0 = this;
        synchronized (r0) {
            if (((byte) (this.bitmap$0 & 2)) == 0) {
                akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener = akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener();
                this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener = akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
                r0 = this;
                r0.bitmap$0 = (byte) (this.bitmap$0 | 2);
            }
        }
        return this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public ShutdownListener akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener() {
        return ((byte) (this.bitmap$0 & 2)) == 0 ? akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzycompute() : this.akka$stream$alpakka$amqp$impl$AmqpConnectorLogic$$shutdownListener;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public AmqpSourceSettings settings() {
        return this.settings;
    }

    public Queue<CommittableIncomingMessage> akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue() {
        return this.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue;
    }

    public int akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages() {
        return this.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages;
    }

    private void akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages_$eq(int i) {
        this.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages = i;
    }

    @Override // akka.stream.alpakka.amqp.impl.AmqpConnectorLogic
    public void whenConnected() {
        channel().basicQos(this.$outer.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$bufferSize);
        AmqpSourceStage$$anon$1$$anon$2 amqpSourceStage$$anon$1$$anon$2 = new AmqpSourceStage$$anon$1$$anon$2(this, getAsyncCallback(committableIncomingMessage -> {
            this.handleDelivery(committableIncomingMessage);
            return BoxedUnit.UNIT;
        }), getAsyncCallback(ackArguments -> {
            $anonfun$whenConnected$2(this, ackArguments);
            return BoxedUnit.UNIT;
        }), getAsyncCallback(nackArguments -> {
            $anonfun$whenConnected$3(this, nackArguments);
            return BoxedUnit.UNIT;
        }));
        AmqpSourceSettings amqpSourceSettings = settings();
        if (amqpSourceSettings instanceof NamedQueueSourceSettings) {
            setupNamedQueue$1((NamedQueueSourceSettings) amqpSourceSettings, amqpSourceStage$$anon$1$$anon$2);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(amqpSourceSettings instanceof TemporaryQueueSourceSettings)) {
                throw new MatchError(amqpSourceSettings);
            }
            setupTemporaryQueue$1((TemporaryQueueSourceSettings) amqpSourceSettings, amqpSourceStage$$anon$1$$anon$2);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleDelivery(CommittableIncomingMessage committableIncomingMessage) {
        if (isAvailable(this.$outer.out())) {
            akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$pushMessage(committableIncomingMessage);
        } else if (akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue().size() + 1 > this.$outer.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$bufferSize) {
            onFailure(new RuntimeException(new StringBuilder(28).append("Reached maximum buffer size ").append(this.$outer.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$bufferSize).toString()));
        } else {
            akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue().enqueue(Predef$.MODULE$.wrapRefArray(new CommittableIncomingMessage[]{committableIncomingMessage}));
        }
    }

    public void akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$pushMessage(CommittableIncomingMessage committableIncomingMessage) {
        push(this.$outer.out(), committableIncomingMessage);
        akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages_$eq(akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages() + 1);
    }

    public static final /* synthetic */ void $anonfun$whenConnected$2(AmqpSourceStage$$anon$1 amqpSourceStage$$anon$1, AckArguments ackArguments) {
        BoxedUnit boxedUnit;
        if (ackArguments == null) {
            throw new MatchError(ackArguments);
        }
        long deliveryTag = ackArguments.deliveryTag();
        boolean multiple = ackArguments.multiple();
        Promise<Done> promise = ackArguments.promise();
        try {
            amqpSourceStage$$anon$1.channel().basicAck(deliveryTag, multiple);
            amqpSourceStage$$anon$1.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages_$eq(amqpSourceStage$$anon$1.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages() - 1);
            if (amqpSourceStage$$anon$1.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages() == 0 && amqpSourceStage$$anon$1.isClosed(amqpSourceStage$$anon$1.$outer.out())) {
                amqpSourceStage$$anon$1.completeStage();
            }
            promise.complete(new Success(Done$.MODULE$));
            boxedUnit = BoxedUnit.UNIT;
        } catch (Throwable th) {
            promise.failure(th);
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$whenConnected$3(AmqpSourceStage$$anon$1 amqpSourceStage$$anon$1, NackArguments nackArguments) {
        BoxedUnit boxedUnit;
        if (nackArguments == null) {
            throw new MatchError(nackArguments);
        }
        long deliveryTag = nackArguments.deliveryTag();
        boolean multiple = nackArguments.multiple();
        boolean requeue = nackArguments.requeue();
        Promise<Done> promise = nackArguments.promise();
        try {
            amqpSourceStage$$anon$1.channel().basicNack(deliveryTag, multiple, requeue);
            amqpSourceStage$$anon$1.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages_$eq(amqpSourceStage$$anon$1.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages() - 1);
            if (amqpSourceStage$$anon$1.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages() == 0 && amqpSourceStage$$anon$1.isClosed(amqpSourceStage$$anon$1.$outer.out())) {
                amqpSourceStage$$anon$1.completeStage();
            }
            promise.complete(new Success(Done$.MODULE$));
            boxedUnit = BoxedUnit.UNIT;
        } catch (Throwable th) {
            promise.failure(th);
            boxedUnit = BoxedUnit.UNIT;
        }
    }

    private final void setupNamedQueue$1(NamedQueueSourceSettings namedQueueSourceSettings, DefaultConsumer defaultConsumer) {
        channel().basicConsume(namedQueueSourceSettings.queue(), !namedQueueSourceSettings.ackRequired(), namedQueueSourceSettings.consumerTag(), namedQueueSourceSettings.noLocal(), namedQueueSourceSettings.exclusive(), (Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(namedQueueSourceSettings.arguments()).asJava(), defaultConsumer);
    }

    private final void setupTemporaryQueue$1(TemporaryQueueSourceSettings temporaryQueueSourceSettings, DefaultConsumer defaultConsumer) {
        String queue = channel().queueDeclare().getQueue();
        channel().queueBind(queue, temporaryQueueSourceSettings.exchange(), (String) temporaryQueueSourceSettings.routingKey().getOrElse(() -> {
            return "";
        }));
        channel().basicConsume(queue, defaultConsumer);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AmqpSourceStage$$anon$1(AmqpSourceStage amqpSourceStage) {
        super(amqpSourceStage.m26shape());
        if (amqpSourceStage == null) {
            throw null;
        }
        this.$outer = amqpSourceStage;
        AmqpConnectorLogic.$init$(this);
        this.settings = amqpSourceStage.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$settings;
        this.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue = Queue$.MODULE$.apply(Nil$.MODULE$);
        this.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages = 0;
        setHandler(amqpSourceStage.out(), new OutHandler(this) { // from class: akka.stream.alpakka.amqp.impl.AmqpSourceStage$$anon$1$$anon$4
            private final /* synthetic */ AmqpSourceStage$$anon$1 $outer;

            public void onPull() {
                if (this.$outer.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue().nonEmpty()) {
                    this.$outer.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$pushMessage((CommittableIncomingMessage) this.$outer.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$queue().dequeue());
                }
            }

            public void onDownstreamFinish() {
                this.$outer.setKeepGoing(true);
                if (this.$outer.akka$stream$alpakka$amqp$impl$AmqpSourceStage$$anon$$unackedMessages() == 0) {
                    OutHandler.onDownstreamFinish$(this);
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
