package akka.stream.alpakka.amqp;

import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.util.ByteString$;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.util.Map;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Some;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: AmqpRpcFlowStage.scala */
/* loaded from: input_file:akka/stream/alpakka/amqp/AmqpRpcFlowStage$$anon$1.class */
public final class AmqpRpcFlowStage$$anon$1 extends GraphStageLogic implements AmqpConnectorLogic {
    private final AmqpSinkSettings settings;
    private final String akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$exchange;
    private final String akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$routingKey;
    private final Queue<IncomingMessage> akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue;
    private String akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName;
    private int akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages;
    private Connection akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection;
    private Channel channel;
    private final /* synthetic */ AmqpRpcFlowStage $outer;
    private final Promise promise$1;

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public final void preStart() {
        preStart();
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public Connection akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection() {
        return this.akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection_$eq(Connection connection) {
        this.akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection = connection;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public Channel channel() {
        return this.channel;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void channel_$eq(Channel channel) {
        this.channel = channel;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public AmqpSinkSettings settings() {
        return this.settings;
    }

    public String akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$exchange() {
        return this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$exchange;
    }

    public String akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$routingKey() {
        return this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$routingKey;
    }

    public Queue<IncomingMessage> akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue() {
        return this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue;
    }

    public String akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName() {
        return this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName;
    }

    private void akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName_$eq(String str) {
        this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName = str;
    }

    public int akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages() {
        return this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages;
    }

    public void akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages_$eq(int i) {
        this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages = i;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public ConnectionFactory connectionFactoryFrom(AmqpConnectionSettings amqpConnectionSettings) {
        return this.$outer.connectionFactoryFrom(amqpConnectionSettings);
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public Connection newConnection(ConnectionFactory connectionFactory, AmqpConnectionSettings amqpConnectionSettings) {
        return this.$outer.newConnection(connectionFactory, amqpConnectionSettings);
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void whenConnected() {
        final AsyncCallback asyncCallback = getAsyncCallback(tuple2 -> {
            $anonfun$whenConnected$1(this, tuple2);
            return BoxedUnit.UNIT;
        });
        pull(this.$outer.in());
        channel().basicQos(this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$bufferSize, true);
        final AsyncCallback asyncCallback2 = getAsyncCallback(incomingMessage -> {
            this.handleDelivery(incomingMessage);
            return BoxedUnit.UNIT;
        });
        DefaultConsumer defaultConsumer = new DefaultConsumer(this, asyncCallback, asyncCallback2) { // from class: akka.stream.alpakka.amqp.AmqpRpcFlowStage$$anon$1$$anon$2
            private final AsyncCallback shutdownCallback$1;
            private final AsyncCallback consumerCallback$1;

            public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
                this.consumerCallback$1.invoke(new IncomingMessage(ByteString$.MODULE$.apply(bArr), envelope, basicProperties));
            }

            public void handleCancel(String str) {
                this.shutdownCallback$1.invoke(new Tuple2(str, None$.MODULE$));
            }

            public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
                this.shutdownCallback$1.invoke(new Tuple2(str, Option$.MODULE$.apply(shutdownSignalException)));
            }

            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(this.channel());
                this.shutdownCallback$1 = asyncCallback;
                this.consumerCallback$1 = asyncCallback2;
            }
        };
        akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName_$eq(channel().queueDeclare("", false, true, true, (Map) JavaConverters$.MODULE$.mapAsJavaMapConverter(Predef$.MODULE$.Map().empty()).asJava()).getQueue());
        channel().basicConsume(akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName(), defaultConsumer);
        this.promise$1.success(akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleDelivery(IncomingMessage incomingMessage) {
        if (isAvailable(this.$outer.out())) {
            akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$pushAndAckMessage(incomingMessage);
        } else if (akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue().size() + 1 > this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$bufferSize) {
            failStage(new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Reached maximum buffer size ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$bufferSize)}))));
        } else {
            akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue().enqueue(Predef$.MODULE$.wrapRefArray(new IncomingMessage[]{incomingMessage}));
        }
    }

    public void akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$pushAndAckMessage(IncomingMessage incomingMessage) {
        push(this.$outer.out(), incomingMessage);
        channel().basicAck(incomingMessage.envelope().getDeliveryTag(), false);
        akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages_$eq(akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages() - 1);
        if (akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages() == 0 && isClosed(this.$outer.in())) {
            completeStage();
        }
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void postStop() {
        this.promise$1.tryFailure(new RuntimeException("stage stopped unexpectedly"));
        postStop();
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void onFailure(Throwable th) {
        this.promise$1.tryFailure(th);
    }

    public /* synthetic */ AmqpRpcFlowStage akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$$outer() {
        return this.$outer;
    }

    public static final /* synthetic */ void $anonfun$whenConnected$1(AmqpRpcFlowStage$$anon$1 amqpRpcFlowStage$$anon$1, Tuple2 tuple2) {
        if (tuple2 != null) {
            String str = (String) tuple2._1();
            Some some = (Option) tuple2._2();
            if (some instanceof Some) {
                RuntimeException runtimeException = new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Consumer ", " with consumerTag ", " shut down unexpectedly"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{amqpRpcFlowStage$$anon$1.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName(), str})), (ShutdownSignalException) some.value());
                amqpRpcFlowStage$$anon$1.promise$1.tryFailure(runtimeException);
                amqpRpcFlowStage$$anon$1.failStage(runtimeException);
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
                return;
            }
        }
        if (tuple2 != null) {
            String str2 = (String) tuple2._1();
            if (None$.MODULE$.equals((Option) tuple2._2())) {
                RuntimeException runtimeException2 = new RuntimeException(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Consumer ", " with consumerTag ", " shut down unexpectedly"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{amqpRpcFlowStage$$anon$1.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName(), str2})));
                amqpRpcFlowStage$$anon$1.promise$1.tryFailure(runtimeException2);
                amqpRpcFlowStage$$anon$1.failStage(runtimeException2);
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                return;
            }
        }
        throw new MatchError(tuple2);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AmqpRpcFlowStage$$anon$1(AmqpRpcFlowStage amqpRpcFlowStage, Promise promise) {
        super(amqpRpcFlowStage.m7shape());
        if (amqpRpcFlowStage == null) {
            throw null;
        }
        this.$outer = amqpRpcFlowStage;
        this.promise$1 = promise;
        AmqpConnectorLogic.$init$(this);
        this.settings = amqpRpcFlowStage.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$settings;
        this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$exchange = (String) settings().exchange().getOrElse(() -> {
            return "";
        });
        this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$routingKey = (String) settings().routingKey().getOrElse(() -> {
            return "";
        });
        this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue = Queue$.MODULE$.apply(Nil$.MODULE$);
        this.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages = 0;
        setHandler(amqpRpcFlowStage.out(), new OutHandler(this) { // from class: akka.stream.alpakka.amqp.AmqpRpcFlowStage$$anon$1$$anon$3
            private final /* synthetic */ AmqpRpcFlowStage$$anon$1 $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                if (this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue().nonEmpty()) {
                    this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$pushAndAckMessage((IncomingMessage) this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue().dequeue());
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
        setHandler(amqpRpcFlowStage.in(), new InHandler(this) { // from class: akka.stream.alpakka.amqp.AmqpRpcFlowStage$$anon$1$$anon$4
            private final /* synthetic */ AmqpRpcFlowStage$$anon$1 $outer;

            public void onUpstreamFailure(Throwable th) throws Exception {
                InHandler.onUpstreamFailure$(this, th);
            }

            public void onUpstreamFinish() {
                if (this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queue().isEmpty() && this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages() == 0) {
                    InHandler.onUpstreamFinish$(this);
                }
            }

            public void onPush() {
                int unboxToInt;
                OutgoingMessage outgoingMessage = (OutgoingMessage) this.$outer.grab(this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$$outer().in());
                AMQP.BasicProperties build = ((AMQP.BasicProperties) outgoingMessage.props().getOrElse(() -> {
                    return new AMQP.BasicProperties();
                })).builder().replyTo(this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$queueName()).build();
                this.$outer.channel().basicPublish(this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$exchange(), this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$routingKey(), outgoingMessage.mandatory(), outgoingMessage.immediate(), build, (byte[]) outgoingMessage.bytes().toArray(ClassTag$.MODULE$.Byte()));
                Map headers = build.getHeaders();
                if (headers == null) {
                    unboxToInt = this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$$outer().akka$stream$alpakka$amqp$AmqpRpcFlowStage$$responsesPerMessage;
                } else {
                    Object obj = headers.get("expectedReplies");
                    unboxToInt = obj != null ? BoxesRunTime.unboxToInt(obj) : this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$$outer().akka$stream$alpakka$amqp$AmqpRpcFlowStage$$responsesPerMessage;
                }
                this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages_$eq(this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$outstandingMessages() + unboxToInt);
                this.$outer.pull(this.$outer.akka$stream$alpakka$amqp$AmqpRpcFlowStage$$anon$$$outer().in());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                InHandler.$init$(this);
            }
        });
    }
}
