package akka.stream.alpakka.mqtt.impl;

import akka.event.LoggingAdapter;
import akka.stream.alpakka.mqtt.MqttMessage;
import akka.stream.alpakka.mqtt.MqttOfflinePersistenceSettings;
import akka.stream.alpakka.mqtt.MqttQoS;
import akka.stream.alpakka.mqtt.impl.MqttFlowStage;
import akka.stream.alpakka.mqtt.scaladsl.MqttMessageWithAck;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.InHandler;
import akka.stream.stage.OutHandler;
import akka.stream.stage.StageLogging;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import scala.Option;
import scala.Option$;
import scala.Some;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: MqttFlowStage.scala */
/* loaded from: input_file:akka/stream/alpakka/mqtt/impl/MqttFlowStage$$anon$1.class */
public final class MqttFlowStage$$anon$1 extends GraphStageLogic implements StageLogging, InHandler, OutHandler {
    private final Semaphore akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$backpressurePahoClient;
    private Option<MqttMessage> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pendingMsg;
    private final Queue<MqttMessageWithAck> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue;
    private final AtomicInteger akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages;
    private final AsyncCallback<Try<IMqttToken>> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onSubscribe;
    private final AsyncCallback<IMqttAsyncClient> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnect;
    private final AsyncCallback<Throwable> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnectionLost;
    private final AsyncCallback<MqttMessageWithAck> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onMessageAsyncCallback;
    private final AsyncCallback<Try<IMqttToken>> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onPublished;
    private final MqttAsyncClient client;
    private final AsyncCallback<MqttFlowStage.CommitCallbackArguments> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$commitCallback;
    private final /* synthetic */ MqttFlowStage $outer;
    public final Promise subscriptionPromise$1;
    private LoggingAdapter akka$stream$stage$StageLogging$$_log;

    public LoggingAdapter akka$stream$stage$StageLogging$$_log() {
        return this.akka$stream$stage$StageLogging$$_log;
    }

    public void akka$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public Class<?> logSource() {
        return StageLogging.class.logSource(this);
    }

    public LoggingAdapter log() {
        return StageLogging.class.log(this);
    }

    public Semaphore akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$backpressurePahoClient() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$backpressurePahoClient;
    }

    public Option<MqttMessage> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pendingMsg() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pendingMsg;
    }

    public void akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pendingMsg_$eq(Option<MqttMessage> option) {
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pendingMsg = option;
    }

    public Queue<MqttMessageWithAck> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue;
    }

    public AtomicInteger akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages;
    }

    public AsyncCallback<Try<IMqttToken>> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onSubscribe() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onSubscribe;
    }

    public AsyncCallback<IMqttAsyncClient> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnect() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnect;
    }

    public AsyncCallback<Throwable> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnectionLost() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnectionLost;
    }

    public AsyncCallback<MqttMessageWithAck> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onMessageAsyncCallback() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onMessageAsyncCallback;
    }

    public AsyncCallback<Try<IMqttToken>> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onPublished() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onPublished;
    }

    private DisconnectedBufferOptions createPahoBufferOptions(MqttOfflinePersistenceSettings mqttOfflinePersistenceSettings) {
        DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
        disconnectedBufferOptions.setBufferEnabled(true);
        disconnectedBufferOptions.setBufferSize(mqttOfflinePersistenceSettings.bufferSize());
        disconnectedBufferOptions.setDeleteOldestMessages(mqttOfflinePersistenceSettings.deleteOldestMessage());
        disconnectedBufferOptions.setPersistBuffer(mqttOfflinePersistenceSettings.persistBuffer());
        return disconnectedBufferOptions;
    }

    private MqttAsyncClient client() {
        return this.client;
    }

    public MqttAsyncClient akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient() {
        MqttAsyncClient client;
        Some offlinePersistenceSettings = this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings.offlinePersistenceSettings();
        if (offlinePersistenceSettings instanceof Some) {
            client().setBufferOpts(createPahoBufferOptions((MqttOfflinePersistenceSettings) offlinePersistenceSettings.x()));
            client = client();
        } else {
            client = client();
        }
        return client;
    }

    public AsyncCallback<MqttFlowStage.CommitCallbackArguments> akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$commitCallback() {
        return this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$commitCallback;
    }

    public void onPush() {
        MqttMessage mqttMessage = (MqttMessage) grab(this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$in());
        try {
            akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$publishToMqtt(mqttMessage);
        } catch (Throwable th) {
            if ((th instanceof MqttException) && this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings.automaticReconnect()) {
                akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pendingMsg_$eq(new Some(mqttMessage));
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    throw ((Throwable) unapply.get());
                }
                throw th;
            }
        }
    }

    public void onUpstreamFinish() {
        setKeepGoing(true);
        if (akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue().isEmpty() && akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages().get() == 0) {
            InHandler.class.onUpstreamFinish(this);
        }
    }

    public void onUpstreamFailure(Throwable th) {
        setKeepGoing(true);
        if (akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue().isEmpty() && akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages().get() == 0) {
            InHandler.class.onUpstreamFailure(this, th);
        }
    }

    public void onPull() {
        if (akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue().nonEmpty()) {
            akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pushDownstream((MqttMessageWithAck) akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue().dequeue());
            if (akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages().get() == 0 && isClosed(this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$in())) {
                completeStage();
            }
        }
    }

    public void onDownstreamFinish() {
        setKeepGoing(true);
        if (akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages().get() == 0) {
            OutHandler.class.onDownstreamFinish(this);
        }
    }

    public void akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$publishToMqtt(MqttMessage mqttMessage) {
        org.eclipse.paho.client.mqttv3.MqttMessage mqttMessage2 = new org.eclipse.paho.client.mqttv3.MqttMessage((byte[]) mqttMessage.payload().toArray(ClassTag$.MODULE$.Byte()));
        mqttMessage2.setQos(((MqttQoS) mqttMessage.qos().getOrElse(new MqttFlowStage$$anon$1$$anonfun$akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$publishToMqtt$1(this))).value());
        mqttMessage2.setRetained(mqttMessage.retained());
        akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().publish(mqttMessage.topic(), mqttMessage2, mqttMessage, MqttFlowStage$.MODULE$.asActionListener(new MqttFlowStage$$anon$1$$anonfun$akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$publishToMqtt$2(this)));
    }

    public void akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pushDownstream(MqttMessageWithAck mqttMessageWithAck) {
        push(this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$out(), mqttMessageWithAck);
        akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$backpressurePahoClient().release();
        if (this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$manualAcks) {
            akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages().incrementAndGet();
        }
    }

    public void akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$failStageWith(Throwable th) {
        this.subscriptionPromise$1.tryFailure(th);
        failStage(th);
    }

    public void preStart() {
        try {
            akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().connect(MqttFlowStage$.MODULE$.asConnectOptions(this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings), BoxedUnit.UNIT, new IMqttActionListener(this) { // from class: akka.stream.alpakka.mqtt.impl.MqttFlowStage$$anon$1$$anon$4
                private final /* synthetic */ MqttFlowStage$$anon$1 $outer;

                public void onSuccess(IMqttToken iMqttToken) {
                    this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnect().invoke(iMqttToken.getClient());
                }

                public void onFailure(IMqttToken iMqttToken, Throwable th) {
                    this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnectionLost().invoke(th);
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                }
            });
        } catch (Throwable th) {
            akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$failStageWith(th);
        }
    }

    public void postStop() {
        if (this.subscriptionPromise$1.isCompleted()) {
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            BoxesRunTime.boxToBoolean(this.subscriptionPromise$1.tryFailure(new IllegalStateException("Cannot complete subscription because the stage is about to stop or fail")));
        }
        try {
            log().debug("stage stopped, disconnecting");
            akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().disconnect(this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings.disconnectQuiesceTimeout().toMillis(), (Object) null, new IMqttActionListener(this) { // from class: akka.stream.alpakka.mqtt.impl.MqttFlowStage$$anon$1$$anon$5
                private final /* synthetic */ MqttFlowStage$$anon$1 $outer;

                public void onSuccess(IMqttToken iMqttToken) {
                    this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().close();
                }

                public void onFailure(IMqttToken iMqttToken, Throwable th) {
                    this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().disconnectForcibly(0L, this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$$outer().akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings.disconnectTimeout().toMillis());
                    this.$outer.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().close();
                }

                {
                    if (this == null) {
                        throw null;
                    }
                    this.$outer = this;
                }
            });
        } catch (MqttException unused) {
            try {
                akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().close();
            } catch (MqttException unused2) {
            }
        }
    }

    public /* synthetic */ MqttFlowStage akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$$outer() {
        return this.$outer;
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MqttFlowStage$$anon$1(MqttFlowStage mqttFlowStage, Promise promise) {
        super(mqttFlowStage.m9shape());
        if (mqttFlowStage == null) {
            throw null;
        }
        this.$outer = mqttFlowStage;
        this.subscriptionPromise$1 = promise;
        StageLogging.class.$init$(this);
        InHandler.class.$init$(this);
        OutHandler.class.$init$(this);
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$backpressurePahoClient = new Semaphore(mqttFlowStage.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$bufferSize);
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$pendingMsg = Option$.MODULE$.empty();
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$queue = Queue$.MODULE$.apply(Nil$.MODULE$);
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$unackedMessages = new AtomicInteger();
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onSubscribe = getAsyncCallback(new MqttFlowStage$$anon$1$$anonfun$1(this));
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnect = getAsyncCallback(new MqttFlowStage$$anon$1$$anonfun$2(this));
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onConnectionLost = getAsyncCallback(new MqttFlowStage$$anon$1$$anonfun$3(this));
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onMessageAsyncCallback = getAsyncCallback(new MqttFlowStage$$anon$1$$anonfun$4(this));
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$onPublished = getAsyncCallback(new MqttFlowStage$$anon$1$$anonfun$5(this));
        this.client = new MqttAsyncClient(mqttFlowStage.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings.broker(), mqttFlowStage.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings.clientId(), mqttFlowStage.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$connectionSettings.persistence());
        this.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$commitCallback = getAsyncCallback(new MqttFlowStage$$anon$1$$anonfun$6(this));
        akka$stream$alpakka$mqtt$impl$MqttFlowStage$$anon$$mqttClient().setCallback(new MqttFlowStage$$anon$1$$anon$2(this));
        setHandlers(mqttFlowStage.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$in(), mqttFlowStage.akka$stream$alpakka$mqtt$impl$MqttFlowStage$$out(), this);
    }
}
