package akka.stream.alpakka.mqtt;

import akka.Done$;
import akka.stream.stage.AsyncCallback;
import akka.stream.stage.GraphStageLogic;
import akka.stream.stage.OutHandler;
import java.util.concurrent.Semaphore;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Iterable$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Try;

/* compiled from: MqttSourceStage.scala */
/* loaded from: input_file:akka/stream/alpakka/mqtt/MqttSourceStage$$anon$1.class */
public final class MqttSourceStage$$anon$1 extends GraphStageLogic implements MqttConnectorLogic {
    private final Queue<MqttMessage> akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue;
    private final Function1<Try<IMqttToken>, BoxedUnit> mqttSubscriptionCallback;
    private final Semaphore backpressure;
    private Option<IMqttAsyncClient> mqttClient;
    private final AsyncCallback<MqttMessage> onMessage;
    private final MqttConnectionSettings connectionSettings;
    private final AsyncCallback<IMqttAsyncClient> onConnect;
    private final AsyncCallback<Throwable> onConnectionLost;
    private final Function1<Try<IMqttToken>, BoxedUnit> akka$stream$alpakka$mqtt$MqttConnectorLogic$$connectHandler;
    private final /* synthetic */ MqttSourceStage $outer;
    private final Promise subscriptionPromise$1;

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public final void preStart() {
        preStart();
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public AsyncCallback<IMqttAsyncClient> onConnect() {
        return this.onConnect;
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public AsyncCallback<Throwable> onConnectionLost() {
        return this.onConnectionLost;
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public Function1<Try<IMqttToken>, BoxedUnit> akka$stream$alpakka$mqtt$MqttConnectorLogic$$connectHandler() {
        return this.akka$stream$alpakka$mqtt$MqttConnectorLogic$$connectHandler;
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public void akka$stream$alpakka$mqtt$MqttConnectorLogic$_setter_$onConnect_$eq(AsyncCallback<IMqttAsyncClient> asyncCallback) {
        this.onConnect = asyncCallback;
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public void akka$stream$alpakka$mqtt$MqttConnectorLogic$_setter_$onConnectionLost_$eq(AsyncCallback<Throwable> asyncCallback) {
        this.onConnectionLost = asyncCallback;
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public final void akka$stream$alpakka$mqtt$MqttConnectorLogic$_setter_$akka$stream$alpakka$mqtt$MqttConnectorLogic$$connectHandler_$eq(Function1<Try<IMqttToken>, BoxedUnit> function1) {
        this.akka$stream$alpakka$mqtt$MqttConnectorLogic$$connectHandler = function1;
    }

    public Queue<MqttMessage> akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue() {
        return this.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue;
    }

    private Function1<Try<IMqttToken>, BoxedUnit> mqttSubscriptionCallback() {
        return this.mqttSubscriptionCallback;
    }

    private Semaphore backpressure() {
        return this.backpressure;
    }

    private Option<IMqttAsyncClient> mqttClient() {
        return this.mqttClient;
    }

    private void mqttClient_$eq(Option<IMqttAsyncClient> option) {
        this.mqttClient = option;
    }

    private AsyncCallback<MqttMessage> onMessage() {
        return this.onMessage;
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public MqttConnectionSettings connectionSettings() {
        return this.connectionSettings;
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public void handleConnection(IMqttAsyncClient iMqttAsyncClient) {
        Tuple2 unzip = this.$outer.akka$stream$alpakka$mqtt$MqttSourceStage$$settings.subscriptions().unzip(Predef$.MODULE$.$conforms());
        if (unzip == null) {
            throw new MatchError(unzip);
        }
        Tuple2 tuple2 = new Tuple2((Iterable) unzip._1(), (Iterable) unzip._2());
        Iterable iterable = (Iterable) tuple2._1();
        Iterable iterable2 = (Iterable) tuple2._2();
        mqttClient_$eq(new Some(iMqttAsyncClient));
        iMqttAsyncClient.subscribe((String[]) iterable.toArray(ClassTag$.MODULE$.apply(String.class)), (int[]) ((TraversableOnce) iterable2.map(mqttQoS -> {
            return BoxesRunTime.boxToInteger(mqttQoS.byteValue());
        }, Iterable$.MODULE$.canBuildFrom())).toArray(ClassTag$.MODULE$.Int()), BoxedUnit.UNIT, MqttConnectorLogic$.MODULE$.funcToMqttActionListener(mqttSubscriptionCallback()));
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public void onMessage(MqttMessage mqttMessage) {
        backpressure().acquire();
        onMessage().invoke(mqttMessage);
    }

    public void akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$pushMessage(MqttMessage mqttMessage) {
        push(this.$outer.out(), mqttMessage);
        backpressure().release();
    }

    @Override // akka.stream.alpakka.mqtt.MqttConnectorLogic
    public void handleConnectionLost(Throwable th) {
        failStage(th);
        this.subscriptionPromise$1.tryFailure(th);
    }

    public void postStop() {
        mqttClient().foreach(iMqttAsyncClient -> {
            return iMqttAsyncClient.isConnected() ? iMqttAsyncClient.disconnect() : BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$mqttSubscriptionCallback$1(MqttSourceStage$$anon$1 mqttSourceStage$$anon$1, Try r5) {
        mqttSourceStage$$anon$1.subscriptionPromise$1.complete(r5.map(iMqttToken -> {
            return Done$.MODULE$;
        }));
    }

    public static final /* synthetic */ void $anonfun$onMessage$1(MqttSourceStage$$anon$1 mqttSourceStage$$anon$1, MqttMessage mqttMessage) {
        Predef$.MODULE$.require(mqttSourceStage$$anon$1.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue().size() <= mqttSourceStage$$anon$1.$outer.akka$stream$alpakka$mqtt$MqttSourceStage$$bufferSize);
        if (mqttSourceStage$$anon$1.isAvailable(mqttSourceStage$$anon$1.$outer.out())) {
            mqttSourceStage$$anon$1.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$pushMessage(mqttMessage);
        } else {
            mqttSourceStage$$anon$1.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue().enqueue(Predef$.MODULE$.wrapRefArray(new MqttMessage[]{mqttMessage}));
        }
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MqttSourceStage$$anon$1(MqttSourceStage mqttSourceStage, Promise promise) {
        super(mqttSourceStage.m11shape());
        if (mqttSourceStage == null) {
            throw null;
        }
        this.$outer = mqttSourceStage;
        this.subscriptionPromise$1 = promise;
        MqttConnectorLogic.$init$(this);
        this.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue = Queue$.MODULE$.apply(Nil$.MODULE$);
        this.mqttSubscriptionCallback = r4 -> {
            $anonfun$mqttSubscriptionCallback$1(this, r4);
            return BoxedUnit.UNIT;
        };
        this.backpressure = new Semaphore(mqttSourceStage.akka$stream$alpakka$mqtt$MqttSourceStage$$bufferSize);
        this.mqttClient = None$.MODULE$;
        this.onMessage = getAsyncCallback(mqttMessage -> {
            $anonfun$onMessage$1(this, mqttMessage);
            return BoxedUnit.UNIT;
        });
        this.connectionSettings = mqttSourceStage.akka$stream$alpakka$mqtt$MqttSourceStage$$settings.connectionSettings();
        setHandler(mqttSourceStage.out(), new OutHandler(this) { // from class: akka.stream.alpakka.mqtt.MqttSourceStage$$anon$1$$anon$2
            private final /* synthetic */ MqttSourceStage$$anon$1 $outer;

            public void onDownstreamFinish() throws Exception {
                OutHandler.onDownstreamFinish$(this);
            }

            public void onPull() {
                if (this.$outer.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue().nonEmpty()) {
                    this.$outer.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$pushMessage((MqttMessage) this.$outer.akka$stream$alpakka$mqtt$MqttSourceStage$$anon$$queue().dequeue());
                }
            }

            {
                if (this == null) {
                    throw null;
                }
                this.$outer = this;
                OutHandler.$init$(this);
            }
        });
    }
}
