package akka.stream.alpakka.amqp;

import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import scala.Predef$;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;

/* compiled from: AmqpSinkStage.scala */
/* loaded from: input_file:akka/stream/alpakka/amqp/AmqpSinkStage$$anon$1.class */
public final class AmqpSinkStage$$anon$1 extends GraphStageLogic implements AmqpConnectorLogic {
    private final AmqpSinkSettings settings;
    private final String akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$exchange;
    private final String akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$routingKey;
    private Connection akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection;
    private Channel channel;
    private final /* synthetic */ AmqpSinkStage $outer;

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public final void preStart() {
        preStart();
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void postStop() {
        postStop();
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public Connection akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection() {
        return this.akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection_$eq(Connection connection) {
        this.akka$stream$alpakka$amqp$AmqpConnectorLogic$$connection = connection;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public Channel channel() {
        return this.channel;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void channel_$eq(Channel channel) {
        this.channel = channel;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public AmqpSinkSettings settings() {
        return this.settings;
    }

    public String akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$exchange() {
        return this.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$exchange;
    }

    public String akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$routingKey() {
        return this.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$routingKey;
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public ConnectionFactory connectionFactoryFrom(AmqpConnectionSettings amqpConnectionSettings) {
        return this.$outer.connectionFactoryFrom(amqpConnectionSettings);
    }

    @Override // akka.stream.alpakka.amqp.AmqpConnectorLogic
    public void whenConnected() {
        final AsyncCallback asyncCallback = getAsyncCallback(shutdownSignalException -> {
            this.failStage(shutdownSignalException);
            return BoxedUnit.UNIT;
        });
        final AmqpSinkStage$$anon$1 amqpSinkStage$$anon$1 = null;
        channel().addShutdownListener(new ShutdownListener(amqpSinkStage$$anon$1, asyncCallback) { // from class: akka.stream.alpakka.amqp.AmqpSinkStage$$anon$1$$anon$2
            private final AsyncCallback shutdownCallback$1;

            public void shutdownCompleted(ShutdownSignalException shutdownSignalException2) {
                this.shutdownCallback$1.invoke(shutdownSignalException2);
            }

            {
                this.shutdownCallback$1 = asyncCallback;
            }
        });
        pull(this.$outer.in());
    }

    public /* synthetic */ AmqpSinkStage akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public AmqpSinkStage$$anon$1(AmqpSinkStage amqpSinkStage) {
        super(amqpSinkStage.m4shape());
        if (amqpSinkStage == null) {
            throw null;
        }
        this.$outer = amqpSinkStage;
        AmqpConnectorLogic.$init$(this);
        this.settings = amqpSinkStage.akka$stream$alpakka$amqp$AmqpSinkStage$$settings;
        this.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$exchange = (String) settings().exchange().getOrElse(() -> {
            return "";
        });
        this.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$routingKey = (String) settings().routingKey().getOrElse(() -> {
            return "";
        });
        setHandler(amqpSinkStage.in(), new InHandler(this) { // from class: akka.stream.alpakka.amqp.AmqpSinkStage$$anon$1$$anon$3
            private final /* synthetic */ AmqpSinkStage$$anon$1 $outer;

            public void onUpstreamFinish() throws Exception {
                InHandler.onUpstreamFinish$(this);
            }

            public void onUpstreamFailure(Throwable th) throws Exception {
                InHandler.onUpstreamFailure$(this, th);
            }

            public void onPush() {
                OutgoingMessage outgoingMessage = (OutgoingMessage) this.$outer.grab(this.$outer.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$$outer().in());
                this.$outer.channel().basicPublish(this.$outer.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$exchange(), this.$outer.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$routingKey(), outgoingMessage.mandatory(), outgoingMessage.immediate(), (AMQP.BasicProperties) outgoingMessage.props().orNull(Predef$.MODULE$.$conforms()), (byte[]) outgoingMessage.bytes().toArray(ClassTag$.MODULE$.Byte()));
                this.$outer.pull(this.$outer.akka$stream$alpakka$amqp$AmqpSinkStage$$anon$$$outer().in());
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                InHandler.$init$(this);
            }
        });
    }
}
