package org.apache.pekko.stream.connectors.amqp.impl;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ShutdownListener;
import java.util.Collections;
import java.util.Map;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.connectors.amqp.AmqpWriteSettings;
import org.apache.pekko.stream.connectors.amqp.WriteMessage;
import org.apache.pekko.stream.connectors.amqp.scaladsl.CommittableReadResult;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.OutHandler;
import scala.MatchError;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;
import scala.runtime.LazyVals;
import scala.runtime.LazyVals$;
import scala.runtime.LazyVals$Evaluating$;
import scala.runtime.LazyVals$NullValue$;
import scala.runtime.ScalaRunTime$;
import scala.util.Success$;

/* compiled from: AmqpRpcFlowStage.scala */
/* loaded from: input_file:org/apache/pekko/stream/connectors/amqp/impl/AmqpRpcFlowStage$$anon$1.class */
public final class AmqpRpcFlowStage$$anon$1 extends GraphStageLogic implements AmqpConnectorLogic {
    private final Promise streamCompletion$1;
    public static final long OFFSET$1 = LazyVals$.MODULE$.getOffsetStatic(AmqpRpcFlowStage$$anon$1.class.getDeclaredField("org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzy1"));
    public static final long OFFSET$0 = LazyVals$.MODULE$.getOffsetStatic(AmqpRpcFlowStage$$anon$1.class.getDeclaredField("shutdownCallback$lzy1"));
    private Connection org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$connection;
    private Channel channel;
    private volatile Object shutdownCallback$lzy1;
    private volatile Object org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzy1;
    private final AmqpWriteSettings settings;
    public final String org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$exchange;
    public final String org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$routingKey;
    public final Queue org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue;
    public String org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queueName;
    public int org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages;
    public int org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages;
    private final /* synthetic */ AmqpRpcFlowStage $outer;

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AmqpRpcFlowStage$$anon$1(Promise promise, AmqpRpcFlowStage amqpRpcFlowStage) {
        super(amqpRpcFlowStage.m36shape());
        this.streamCompletion$1 = promise;
        if (amqpRpcFlowStage == null) {
            throw new NullPointerException();
        }
        this.$outer = amqpRpcFlowStage;
        AmqpConnectorLogic.$init$(this);
        this.settings = amqpRpcFlowStage.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$writeSettings;
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$exchange = (String) settings().exchange().getOrElse(AmqpRpcFlowStage::org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$_$$lessinit$greater$$anonfun$1);
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$routingKey = (String) settings().routingKey().getOrElse(AmqpRpcFlowStage::org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$_$$lessinit$greater$$anonfun$2);
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue = (Queue) Queue$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new CommittableReadResult[0]));
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages = 0;
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages = 0;
        setHandler(amqpRpcFlowStage.out(), new OutHandler(this) { // from class: org.apache.pekko.stream.connectors.amqp.impl.AmqpRpcFlowStage$$anon$2
            private final /* synthetic */ AmqpRpcFlowStage$$anon$1 $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                if (this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.nonEmpty()) {
                    this.$outer.pushMessage((CommittableReadResult) this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.dequeue());
                }
            }

            public void onDownstreamFinish(Throwable th) {
                this.$outer.protected$setKeepGoing(true);
                if (this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages == 0) {
                    OutHandler.onDownstreamFinish$(this, th);
                }
            }
        });
        setHandler(amqpRpcFlowStage.in(), new InHandler(this) { // from class: org.apache.pekko.stream.connectors.amqp.impl.AmqpRpcFlowStage$$anon$3
            private final /* synthetic */ AmqpRpcFlowStage$$anon$1 $outer;

            {
                if (this == null) {
                    throw new NullPointerException();
                }
                this.$outer = this;
            }

            public void onUpstreamFinish() {
                this.$outer.protected$setKeepGoing(true);
                if (this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.isEmpty() && this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages == 0 && this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages == 0) {
                    InHandler.onUpstreamFinish$(this);
                }
            }

            public void onUpstreamFailure(Throwable th) {
                this.$outer.protected$setKeepGoing(true);
                if (this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.isEmpty() && this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages == 0 && this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages == 0) {
                    InHandler.onUpstreamFailure$(this, th);
                }
            }

            public void onPush() {
                int unboxToInt;
                WriteMessage writeMessage = (WriteMessage) this.$outer.protected$grab(this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$_$$anon$$$outer().in());
                AMQP.BasicProperties build = ((AMQP.BasicProperties) writeMessage.properties().getOrElse(AmqpRpcFlowStage::org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$3$$_$_$$anonfun$1)).builder().replyTo(this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queueName).build();
                this.$outer.channel().basicPublish(this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$exchange, (String) writeMessage.routingKey().getOrElse(this::onPush$$anonfun$1), writeMessage.mandatory(), writeMessage.immediate(), build, (byte[]) writeMessage.bytes().toArray(ClassTag$.MODULE$.apply(Byte.TYPE)));
                Map headers = build.getHeaders();
                if (headers == null) {
                    unboxToInt = this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$_$$anon$$$outer().org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$responsesPerMessage;
                } else {
                    Object obj = headers.get("expectedReplies");
                    unboxToInt = obj != null ? BoxesRunTime.unboxToInt(obj) : this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$_$$anon$$$outer().org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$responsesPerMessage;
                }
                this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages += unboxToInt;
                this.$outer.protected$pull(this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$_$$anon$$$outer().in());
            }

            private final String onPush$$anonfun$1() {
                return this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$routingKey;
            }
        });
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public Connection org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$connection() {
        return this.org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$connection;
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public Channel channel() {
        return this.channel;
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public AsyncCallback shutdownCallback() {
        Object obj = this.shutdownCallback$lzy1;
        if (obj instanceof AsyncCallback) {
            return (AsyncCallback) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (AsyncCallback) shutdownCallback$lzyINIT1();
    }

    private Object shutdownCallback$lzyINIT1() {
        LazyVals$NullValue$ shutdownCallback;
        while (true) {
            Object obj = this.shutdownCallback$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$0, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        shutdownCallback = shutdownCallback();
                        if (shutdownCallback == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = shutdownCallback;
                        }
                        return shutdownCallback;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$0, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.shutdownCallback$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$0, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$0, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public ShutdownListener org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener() {
        Object obj = this.org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzy1;
        if (obj instanceof ShutdownListener) {
            return (ShutdownListener) obj;
        }
        if (obj == LazyVals$NullValue$.MODULE$) {
            return null;
        }
        return (ShutdownListener) org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzyINIT1();
    }

    private Object org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzyINIT1() {
        LazyVals$NullValue$ org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener;
        while (true) {
            Object obj = this.org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzy1;
            if (obj == null) {
                if (LazyVals$.MODULE$.objCAS(this, OFFSET$1, (Object) null, LazyVals$Evaluating$.MODULE$)) {
                    LazyVals$NullValue$ lazyVals$NullValue$ = null;
                    try {
                        org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener = org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener();
                        if (org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener == null) {
                            lazyVals$NullValue$ = LazyVals$NullValue$.MODULE$;
                        } else {
                            lazyVals$NullValue$ = org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener;
                        }
                        return org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener;
                    } finally {
                        if (!LazyVals$.MODULE$.objCAS(this, OFFSET$1, LazyVals$Evaluating$.MODULE$, lazyVals$NullValue$)) {
                            LazyVals.Waiting waiting = (LazyVals.Waiting) this.org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$shutdownListener$lzy1;
                            LazyVals$.MODULE$.objCAS(this, OFFSET$1, waiting, lazyVals$NullValue$);
                            waiting.countDown();
                        }
                    }
                }
            } else {
                if (!(obj instanceof LazyVals.LazyValControlState)) {
                    return obj;
                }
                if (obj == LazyVals$Evaluating$.MODULE$) {
                    LazyVals$.MODULE$.objCAS(this, OFFSET$1, obj, new LazyVals.Waiting());
                } else {
                    if (!(obj instanceof LazyVals.Waiting)) {
                        return null;
                    }
                    ((LazyVals.Waiting) obj).await();
                }
            }
        }
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public void org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$connection_$eq(Connection connection) {
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpConnectorLogic$$connection = connection;
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public void channel_$eq(Channel channel) {
        this.channel = channel;
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public /* bridge */ /* synthetic */ void preStart() {
        preStart();
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public AmqpWriteSettings settings() {
        return this.settings;
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public void whenConnected() {
        pull(this.$outer.in());
        channel().basicQos(this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$bufferSize);
        AmqpRpcFlowStage$$anon$4 amqpRpcFlowStage$$anon$4 = new AmqpRpcFlowStage$$anon$4(getAsyncCallback(committableReadResult -> {
            handleDelivery(committableReadResult);
        }), getAsyncCallback(ackArguments -> {
            if (ackArguments == null) {
                throw new MatchError(ackArguments);
            }
            AckArguments unapply = AckArguments$.MODULE$.unapply(ackArguments);
            long _1 = unapply._1();
            boolean _2 = unapply._2();
            Promise<Done> _3 = unapply._3();
            try {
                channel().basicAck(_1, _2);
                this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages--;
                if (this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages == 0 && (isClosed(this.$outer.out()) || (isClosed(this.$outer.in()) && this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.isEmpty() && this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages == 0))) {
                    completeStage();
                }
                _3.complete(Success$.MODULE$.apply(Done$.MODULE$));
            } catch (Throwable th) {
                _3.failure(th);
            }
        }), getAsyncCallback(nackArguments -> {
            if (nackArguments == null) {
                throw new MatchError(nackArguments);
            }
            NackArguments unapply = NackArguments$.MODULE$.unapply(nackArguments);
            long _1 = unapply._1();
            boolean _2 = unapply._2();
            boolean _3 = unapply._3();
            Promise<Done> _4 = unapply._4();
            try {
                channel().basicNack(_1, _2, _3);
                this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages--;
                if (this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages == 0 && (isClosed(this.$outer.out()) || (isClosed(this.$outer.in()) && this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.isEmpty() && this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages == 0))) {
                    completeStage();
                }
                _4.complete(Success$.MODULE$.apply(Done$.MODULE$));
            } catch (Throwable th) {
                _4.failure(th);
            }
        }), this);
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queueName = channel().queueDeclare("", false, true, true, Collections.emptyMap()).getQueue();
        channel().basicConsume(this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queueName, amqpRpcFlowStage$$anon$4);
        this.streamCompletion$1.success(this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queueName);
    }

    public void handleDelivery(CommittableReadResult committableReadResult) {
        if (isAvailable(this.$outer.out())) {
            pushMessage(committableReadResult);
        } else if (this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.size() + 1 > this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$bufferSize) {
            onFailure(new RuntimeException(new StringBuilder(28).append("Reached maximum buffer size ").append(this.$outer.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$bufferSize).toString()));
        } else {
            this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$queue.enqueue(committableReadResult);
        }
    }

    public void pushMessage(CommittableReadResult committableReadResult) {
        push(this.$outer.out(), committableReadResult);
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$unackedMessages++;
        this.org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$$anon$1$$outstandingMessages--;
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public void postStop() {
        this.streamCompletion$1.tryFailure(new RuntimeException("stage stopped unexpectedly"));
        postStop();
    }

    @Override // org.apache.pekko.stream.connectors.amqp.impl.AmqpConnectorLogic
    public void onFailure(Throwable th) {
        this.streamCompletion$1.tryFailure(th);
        onFailure(th);
    }

    public void protected$setKeepGoing(boolean z) {
        setKeepGoing(z);
    }

    public Object protected$grab(Inlet inlet) {
        return grab(inlet);
    }

    public void protected$pull(Inlet inlet) {
        pull(inlet);
    }

    public final /* synthetic */ AmqpRpcFlowStage org$apache$pekko$stream$connectors$amqp$impl$AmqpRpcFlowStage$_$$anon$$$outer() {
        return this.$outer;
    }
}
