package org.apache.pekko.stream.connectors.mqtt.impl;

import java.io.Serializable;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.Inlet;
import org.apache.pekko.stream.Outlet;
import org.apache.pekko.stream.Shape;
import org.apache.pekko.stream.connectors.mqtt.MqttConnectionSettings;
import org.apache.pekko.stream.connectors.mqtt.MqttMessage;
import org.apache.pekko.stream.connectors.mqtt.MqttOfflinePersistenceSettings;
import org.apache.pekko.stream.connectors.mqtt.MqttQoS;
import org.apache.pekko.stream.connectors.mqtt.scaladsl.MqttMessageWithAck;
import org.apache.pekko.stream.stage.AsyncCallback;
import org.apache.pekko.stream.stage.GraphStageLogic;
import org.apache.pekko.stream.stage.InHandler;
import org.apache.pekko.stream.stage.OutHandler;
import org.apache.pekko.stream.stage.StageLogging;
import org.eclipse.paho.client.mqttv3.DisconnectedBufferOptions;
import org.eclipse.paho.client.mqttv3.IMqttActionListener;
import org.eclipse.paho.client.mqttv3.IMqttAsyncClient;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.IMqttToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import scala.Function1;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef$;
import scala.Product;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.IterableOnceOps;
import scala.collection.Iterator;
import scala.collection.immutable.Iterable;
import scala.collection.immutable.Map;
import scala.collection.mutable.Queue;
import scala.collection.mutable.Queue$;
import scala.concurrent.Promise;
import scala.reflect.ClassTag$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;
import scala.runtime.Statics;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;
import scala.util.control.NonFatal$;

/* compiled from: MqttFlowStage.scala */
/* loaded from: input_file:org/apache/pekko/stream/connectors/mqtt/impl/MqttFlowStageLogic.class */
public abstract class MqttFlowStageLogic<I> extends GraphStageLogic implements StageLogging, InHandler, OutHandler {
    private LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log;
    public final Inlet<I> org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$in;
    private final Outlet<MqttMessageWithAck> out;
    private final Promise<Done> subscriptionPromise;
    public final MqttConnectionSettings org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$connectionSettings;
    private final Map<String, MqttQoS> subscriptions;
    private final int bufferSize;
    private final MqttQoS defaultQoS;
    private final boolean manualAcks;
    public final Semaphore org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$backpressurePahoClient;
    public Option<I> org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$pendingMsg;
    private final Queue<MqttMessageWithAck> queue;
    private final AtomicInteger unackedMessages;
    private final AsyncCallback<Try<IMqttToken>> onSubscribe;
    public final AsyncCallback<IMqttAsyncClient> org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onConnect;
    public final AsyncCallback<Throwable> org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onConnectionLost;
    public final AsyncCallback<MqttMessageWithAck> org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onMessageAsyncCallback;
    private final AsyncCallback<Try<IMqttToken>> onPublished;
    private final MqttAsyncClient client;
    public final AsyncCallback<CommitCallbackArguments> org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$commitCallback;

    /* compiled from: MqttFlowStage.scala */
    /* loaded from: input_file:org/apache/pekko/stream/connectors/mqtt/impl/MqttFlowStageLogic$CommitCallbackArguments.class */
    public static final class CommitCallbackArguments implements Product, Serializable {
        private final int messageId;
        private final MqttQoS qos;
        private final Promise promise;

        public static CommitCallbackArguments apply(int i, MqttQoS mqttQoS, Promise<Done> promise) {
            return MqttFlowStageLogic$CommitCallbackArguments$.MODULE$.apply(i, mqttQoS, promise);
        }

        public static CommitCallbackArguments fromProduct(Product product) {
            return MqttFlowStageLogic$CommitCallbackArguments$.MODULE$.m13fromProduct(product);
        }

        public static CommitCallbackArguments unapply(CommitCallbackArguments commitCallbackArguments) {
            return MqttFlowStageLogic$CommitCallbackArguments$.MODULE$.unapply(commitCallbackArguments);
        }

        public CommitCallbackArguments(int i, MqttQoS mqttQoS, Promise<Done> promise) {
            this.messageId = i;
            this.qos = mqttQoS;
            this.promise = promise;
        }

        public /* bridge */ /* synthetic */ Iterator productIterator() {
            return Product.productIterator$(this);
        }

        public /* bridge */ /* synthetic */ Iterator productElementNames() {
            return Product.productElementNames$(this);
        }

        public int hashCode() {
            return Statics.finalizeHash(Statics.mix(Statics.mix(Statics.mix(Statics.mix(-889275714, productPrefix().hashCode()), messageId()), Statics.anyHash(qos())), Statics.anyHash(promise())), 3);
        }

        public boolean equals(Object obj) {
            boolean z;
            if (this != obj) {
                if (obj instanceof CommitCallbackArguments) {
                    CommitCallbackArguments commitCallbackArguments = (CommitCallbackArguments) obj;
                    if (messageId() == commitCallbackArguments.messageId()) {
                        MqttQoS qos = qos();
                        MqttQoS qos2 = commitCallbackArguments.qos();
                        if (qos != null ? qos.equals(qos2) : qos2 == null) {
                            Promise<Done> promise = promise();
                            Promise<Done> promise2 = commitCallbackArguments.promise();
                            if (promise != null ? promise.equals(promise2) : promise2 == null) {
                                z = true;
                            }
                        }
                    }
                    z = false;
                } else {
                    z = false;
                }
                if (!z) {
                    return false;
                }
            }
            return true;
        }

        public String toString() {
            return ScalaRunTime$.MODULE$._toString(this);
        }

        public boolean canEqual(Object obj) {
            return obj instanceof CommitCallbackArguments;
        }

        public int productArity() {
            return 3;
        }

        public String productPrefix() {
            return "CommitCallbackArguments";
        }

        public Object productElement(int i) {
            switch (i) {
                case 0:
                    return BoxesRunTime.boxToInteger(_1());
                case 1:
                    return _2();
                case 2:
                    return _3();
                default:
                    throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
            }
        }

        public String productElementName(int i) {
            switch (i) {
                case 0:
                    return "messageId";
                case 1:
                    return "qos";
                case 2:
                    return "promise";
                default:
                    throw new IndexOutOfBoundsException(BoxesRunTime.boxToInteger(i).toString());
            }
        }

        public int messageId() {
            return this.messageId;
        }

        public MqttQoS qos() {
            return this.qos;
        }

        public Promise<Done> promise() {
            return this.promise;
        }

        public CommitCallbackArguments copy(int i, MqttQoS mqttQoS, Promise<Done> promise) {
            return new CommitCallbackArguments(i, mqttQoS, promise);
        }

        public int copy$default$1() {
            return messageId();
        }

        public MqttQoS copy$default$2() {
            return qos();
        }

        public Promise<Done> copy$default$3() {
            return promise();
        }

        public int _1() {
            return messageId();
        }

        public MqttQoS _2() {
            return qos();
        }

        public Promise<Done> _3() {
            return promise();
        }
    }

    public static IMqttActionListener asActionListener(Function1<Try<IMqttToken>, BoxedUnit> function1) {
        return MqttFlowStageLogic$.MODULE$.asActionListener(function1);
    }

    public static MqttConnectOptions asConnectOptions(MqttConnectionSettings mqttConnectionSettings) {
        return MqttFlowStageLogic$.MODULE$.asConnectOptions(mqttConnectionSettings);
    }

    /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
    public MqttFlowStageLogic(Inlet<I> inlet, Outlet<MqttMessageWithAck> outlet, Shape shape, Promise<Done> promise, MqttConnectionSettings mqttConnectionSettings, Map<String, MqttQoS> map, int i, MqttQoS mqttQoS, boolean z) {
        super(shape);
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$in = inlet;
        this.out = outlet;
        this.subscriptionPromise = promise;
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$connectionSettings = mqttConnectionSettings;
        this.subscriptions = map;
        this.bufferSize = i;
        this.defaultQoS = mqttQoS;
        this.manualAcks = z;
        StageLogging.$init$(this);
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$backpressurePahoClient = new Semaphore(i);
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$pendingMsg = Option$.MODULE$.empty();
        this.queue = (Queue) Queue$.MODULE$.apply(ScalaRunTime$.MODULE$.wrapRefArray(new MqttMessageWithAck[0]));
        this.unackedMessages = new AtomicInteger();
        this.onSubscribe = getAsyncCallback(r7 -> {
            if (promise.isCompleted()) {
                log().debug("subscription re-established");
            } else {
                promise.complete(r7.map(iMqttToken -> {
                    log().debug("subscription established");
                    return Done$.MODULE$;
                }));
                pull(inlet);
            }
        });
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onConnect = getAsyncCallback(iMqttAsyncClient -> {
            log().debug("connected");
            if (!map.nonEmpty()) {
                promise.complete(MqttFlowStageLogic$.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$$SuccessfullyDone);
                pull(inlet);
                return;
            }
            if (z) {
                iMqttAsyncClient.setManualAcks(true);
            }
            Tuple2 unzip = map.unzip(Predef$.MODULE$.$conforms());
            if (unzip == null) {
                throw new MatchError(unzip);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply((Iterable) unzip._1(), (Iterable) unzip._2());
            iMqttAsyncClient.subscribe((String[]) ((Iterable) apply._1()).toArray(ClassTag$.MODULE$.apply(String.class)), (int[]) ((IterableOnceOps) ((Iterable) apply._2()).map(mqttQoS2 -> {
                return mqttQoS2.value();
            })).toArray(ClassTag$.MODULE$.apply(Integer.TYPE)), BoxedUnit.UNIT, MqttFlowStageLogic$.MODULE$.asActionListener(r4 -> {
                this.onSubscribe.invoke(r4);
            }));
        });
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onConnectionLost = getAsyncCallback(th -> {
            failStageWith(th);
        });
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onMessageAsyncCallback = getAsyncCallback(mqttMessageWithAck -> {
            if (isAvailable(outlet)) {
                pushDownstream(mqttMessageWithAck);
            } else if (this.queue.size() + 1 > i) {
                failStageWith(new RuntimeException(new StringBuilder(28).append("Reached maximum buffer size ").append(i).toString()));
            } else {
                this.queue.enqueue(mqttMessageWithAck);
            }
        });
        this.onPublished = getAsyncCallback(r6 -> {
            if (r6 instanceof Success) {
                if (hasBeenPulled(inlet)) {
                    return;
                }
                pull(inlet);
            } else {
                if (!(r6 instanceof Failure)) {
                    throw new MatchError(r6);
                }
                failStageWith(((Failure) r6).exception());
            }
        });
        this.client = new MqttAsyncClient(mqttConnectionSettings.broker(), mqttConnectionSettings.clientId(), mqttConnectionSettings.persistence());
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$commitCallback = getAsyncCallback(commitCallbackArguments -> {
            try {
                org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().messageArrivedComplete(commitCallbackArguments.messageId(), commitCallbackArguments.qos().value());
                if (this.unackedMessages.decrementAndGet() == 0 && (isClosed(outlet) || (isClosed(inlet) && this.queue.isEmpty()))) {
                    completeStage();
                }
                commitCallbackArguments.promise().complete(MqttFlowStageLogic$.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$$SuccessfullyDone);
            } catch (Throwable th2) {
                commitCallbackArguments.promise().failure(th2);
            }
        });
        org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().setCallback(new MqttFlowStageLogic$$anon$2(this));
        setHandlers(inlet, outlet, this);
    }

    public LoggingAdapter org$apache$pekko$stream$stage$StageLogging$$_log() {
        return this.org$apache$pekko$stream$stage$StageLogging$$_log;
    }

    public void org$apache$pekko$stream$stage$StageLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.org$apache$pekko$stream$stage$StageLogging$$_log = loggingAdapter;
    }

    public /* bridge */ /* synthetic */ Class logSource() {
        return StageLogging.logSource$(this);
    }

    public /* bridge */ /* synthetic */ LoggingAdapter log() {
        return StageLogging.log$(this);
    }

    public /* bridge */ /* synthetic */ void onDownstreamFinish() throws Exception {
        OutHandler.onDownstreamFinish$(this);
    }

    public void handleDeliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
    }

    private DisconnectedBufferOptions createPahoBufferOptions(MqttOfflinePersistenceSettings mqttOfflinePersistenceSettings) {
        DisconnectedBufferOptions disconnectedBufferOptions = new DisconnectedBufferOptions();
        disconnectedBufferOptions.setBufferEnabled(true);
        disconnectedBufferOptions.setBufferSize(mqttOfflinePersistenceSettings.bufferSize());
        disconnectedBufferOptions.setDeleteOldestMessages(mqttOfflinePersistenceSettings.deleteOldestMessage());
        disconnectedBufferOptions.setPersistBuffer(mqttOfflinePersistenceSettings.persistBuffer());
        return disconnectedBufferOptions;
    }

    public MqttAsyncClient org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient() {
        Some offlinePersistenceSettings = this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$connectionSettings.offlinePersistenceSettings();
        if (!(offlinePersistenceSettings instanceof Some)) {
            return this.client;
        }
        this.client.setBufferOpts(createPahoBufferOptions((MqttOfflinePersistenceSettings) offlinePersistenceSettings.value()));
        return this.client;
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void onPush() {
        Object grab = grab(this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$in);
        try {
            publishPending(grab);
        } catch (Throwable th) {
            if ((th instanceof MqttException) && this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$connectionSettings.automaticReconnect()) {
                this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$pendingMsg = Some$.MODULE$.apply(grab);
                return;
            }
            if (th != null) {
                Option unapply = NonFatal$.MODULE$.unapply(th);
                if (!unapply.isEmpty()) {
                    throw ((Throwable) unapply.get());
                }
            }
            throw th;
        }
    }

    public void onUpstreamFinish() {
        setKeepGoing(true);
        if (this.queue.isEmpty() && this.unackedMessages.get() == 0) {
            InHandler.onUpstreamFinish$(this);
        }
    }

    public void onUpstreamFailure(Throwable th) {
        setKeepGoing(true);
        if (this.queue.isEmpty() && this.unackedMessages.get() == 0) {
            InHandler.onUpstreamFailure$(this, th);
        }
    }

    public void onPull() {
        if (this.queue.nonEmpty()) {
            pushDownstream((MqttMessageWithAck) this.queue.dequeue());
            if (this.unackedMessages.get() == 0 && isClosed(this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$in)) {
                completeStage();
            }
        }
    }

    public void onDownstreamFinish(Throwable th) {
        setKeepGoing(true);
        if (this.unackedMessages.get() == 0) {
            OutHandler.onDownstreamFinish$(this, th);
        }
    }

    public IMqttDeliveryToken publishToMqtt(MqttMessage mqttMessage) {
        org.eclipse.paho.client.mqttv3.MqttMessage mqttMessage2 = new org.eclipse.paho.client.mqttv3.MqttMessage((byte[]) mqttMessage.payload().toArray(ClassTag$.MODULE$.apply(Byte.TYPE)));
        mqttMessage2.setQos(((MqttQoS) mqttMessage.qos().getOrElse(this::publishToMqtt$$anonfun$1)).value());
        mqttMessage2.setRetained(mqttMessage.retained());
        return org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().publish(mqttMessage.topic(), mqttMessage2, mqttMessage, MqttFlowStageLogic$.MODULE$.asActionListener(r4 -> {
            this.onPublished.invoke(r4);
        }));
    }

    public void publishPending(I i) {
    }

    private void pushDownstream(MqttMessageWithAck mqttMessageWithAck) {
        push(this.out, mqttMessageWithAck);
        this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$backpressurePahoClient.release();
        if (this.manualAcks) {
            this.unackedMessages.incrementAndGet();
        }
    }

    private void failStageWith(Throwable th) {
        this.subscriptionPromise.tryFailure(th);
        failStage(th);
    }

    public void preStart() {
        try {
            org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().connect(MqttFlowStageLogic$.MODULE$.asConnectOptions(this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$connectionSettings), BoxedUnit.UNIT, new IMqttActionListener(this) { // from class: org.apache.pekko.stream.connectors.mqtt.impl.MqttFlowStageLogic$$anon$4
                private final /* synthetic */ MqttFlowStageLogic $outer;

                {
                    if (this == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = this;
                }

                public void onSuccess(IMqttToken iMqttToken) {
                    this.$outer.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onConnect.invoke(iMqttToken.getClient());
                }

                public void onFailure(IMqttToken iMqttToken, Throwable th) {
                    this.$outer.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$onConnectionLost.invoke(th);
                }
            });
        } catch (Throwable th) {
            failStageWith(th);
        }
    }

    public void postStop() {
        if (!this.subscriptionPromise.isCompleted()) {
            this.subscriptionPromise.tryFailure(new IllegalStateException("Cannot complete subscription because the stage is about to stop or fail"));
        }
        try {
            log().debug("stage stopped, disconnecting");
            org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().disconnect(this.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$connectionSettings.disconnectQuiesceTimeout().toMillis(), (Object) null, new IMqttActionListener(this) { // from class: org.apache.pekko.stream.connectors.mqtt.impl.MqttFlowStageLogic$$anon$5
                private final /* synthetic */ MqttFlowStageLogic $outer;

                {
                    if (this == null) {
                        throw new NullPointerException();
                    }
                    this.$outer = this;
                }

                public void onSuccess(IMqttToken iMqttToken) {
                    this.$outer.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().close();
                }

                public void onFailure(IMqttToken iMqttToken, Throwable th) {
                    this.$outer.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().disconnectForcibly(0L, this.$outer.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$connectionSettings.disconnectTimeout().toMillis());
                    this.$outer.org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().close();
                }
            });
        } catch (MqttException unused) {
            try {
                org$apache$pekko$stream$connectors$mqtt$impl$MqttFlowStageLogic$$mqttClient().close();
            } catch (MqttException unused2) {
            }
        }
    }

    public <T> boolean protected$hasBeenPulled(Inlet<T> inlet) {
        return hasBeenPulled(inlet);
    }

    public <T> void protected$pull(Inlet<T> inlet) {
        pull(inlet);
    }

    private final MqttQoS publishToMqtt$$anonfun$1() {
        return this.defaultQoS;
    }
}
