package org.apache.pekko.stream.connectors.mqtt.streaming.impl;

import java.io.Serializable;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorRef$;
import org.apache.pekko.actor.typed.ActorRef$ActorRefOps$;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.scaladsl.ActorContext;
import org.apache.pekko.actor.typed.scaladsl.Behaviors$;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.stream.Materializer;
import org.apache.pekko.stream.OverflowStrategy$;
import org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacketFlags$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttSessionSettings;
import org.apache.pekko.stream.connectors.mqtt.streaming.PacketId;
import org.apache.pekko.stream.connectors.mqtt.streaming.Publish;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.LocalPacketRouter;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer;
import org.apache.pekko.stream.scaladsl.BroadcastHub$;
import org.apache.pekko.stream.scaladsl.Keep$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.stream.scaladsl.SourceQueueWithComplete;
import scala.MatchError;
import scala.Option;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.runtime.ModuleSerializationProxy;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: RequestState.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/stream/connectors/mqtt/streaming/impl/Producer$.class */
public final class Producer$ implements Serializable {
    public static final Producer$Start$ Start = null;
    public static final Producer$Publishing$ Publishing = null;
    public static final Producer$AcquiredPacketId$ AcquiredPacketId = null;
    public static final Producer$UnacquiredPacketId$ UnacquiredPacketId = null;
    public static final Producer$ReceivePubAckRecTimeout$ ReceivePubAckRecTimeout = null;
    public static final Producer$PubAckReceivedFromRemote$ PubAckReceivedFromRemote = null;
    public static final Producer$PubRecReceivedFromRemote$ PubRecReceivedFromRemote = null;
    public static final Producer$ReceivePubCompTimeout$ ReceivePubCompTimeout = null;
    public static final Producer$PubCompReceivedFromRemote$ PubCompReceivedFromRemote = null;
    public static final Producer$ReceiveConnect$ ReceiveConnect = null;
    public static final Producer$QueueOfferCompleted$ QueueOfferCompleted = null;
    public static final Producer$ForwardPublish$ ForwardPublish = null;
    public static final Producer$ForwardPubAck$ ForwardPubAck = null;
    public static final Producer$ForwardPubRec$ ForwardPubRec = null;
    public static final Producer$ForwardPubRel$ ForwardPubRel = null;
    public static final Producer$ForwardPubComp$ ForwardPubComp = null;
    public static final Producer$ MODULE$ = new Producer$();

    private Producer$() {
    }

    private Object writeReplace() {
        return new ModuleSerializationProxy(Producer$.class);
    }

    public Behavior<Producer.Event> apply(Publish publish, Option<?> option, Promise<Source<Producer.ForwardPublishingCommand, NotUsed>> promise, ActorRef<LocalPacketRouter.Request<Producer.Event>> actorRef, MqttSessionSettings mqttSessionSettings, Materializer materializer) {
        return preparePublish(Producer$Start$.MODULE$.apply(publish, option, promise, actorRef, mqttSessionSettings), materializer);
    }

    public Behavior<Producer.Event> preparePublish(Producer.Start start, Materializer materializer) {
        return Behaviors$.MODULE$.setup(actorContext -> {
            org$apache$pekko$stream$connectors$mqtt$streaming$impl$Producer$$$_$requestPacketId$1(start, actorContext);
            Tuple2 tuple2 = (Tuple2) Source$.MODULE$.queue(start.settings().clientSendBufferSize(), OverflowStrategy$.MODULE$.backpressure()).toMat(BroadcastHub$.MODULE$.sink(), Keep$.MODULE$.both()).run(materializer);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 apply = Tuple2$.MODULE$.apply((SourceQueueWithComplete) tuple2._1(), (Source) tuple2._2());
            SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) apply._1();
            start.remote().success((Source) apply._2());
            return Behaviors$.MODULE$.receiveMessagePartial(new Producer$$anon$1(sourceQueueWithComplete, start, materializer, actorContext)).receiveSignal(new Producer$$anon$2(sourceQueueWithComplete));
        });
    }

    public Behavior<Producer.Event> publishUnacknowledged(Producer.Publishing publishing, Materializer materializer) {
        String str = "producer-receive-pubackrec";
        return Behaviors$.MODULE$.withTimers(timerScheduler -> {
            if (publishing.settings().producerPubAckRecTimeout().toNanos() > 0) {
                timerScheduler.startSingleTimer(str, Producer$ReceivePubAckRecTimeout$.MODULE$, publishing.settings().producerPubAckRecTimeout());
            }
            return Behaviors$.MODULE$.receive((actorContext, event) -> {
                Tuple2 apply = Tuple2$.MODULE$.apply(actorContext, event);
                if (apply != null) {
                    Producer.Event event = (Producer.Event) apply._2();
                    if (event instanceof Producer.PubAckReceivedFromRemote) {
                        Promise<Producer.ForwardPubAck> _1 = Producer$PubAckReceivedFromRemote$.MODULE$.unapply((Producer.PubAckReceivedFromRemote) event)._1();
                        if (ControlPacketFlags$.MODULE$.contains$extension(publishing.publish().flags(), ControlPacketFlags$.MODULE$.QoSAtLeastOnceDelivery())) {
                            _1.success(Producer$ForwardPubAck$.MODULE$.apply(publishing.publishData()));
                            return Behaviors$.MODULE$.stopped();
                        }
                    }
                    if (event instanceof Producer.PubRecReceivedFromRemote) {
                        Promise<Producer.ForwardPubRec> _12 = Producer$PubRecReceivedFromRemote$.MODULE$.unapply((Producer.PubRecReceivedFromRemote) event)._1();
                        if (ControlPacketFlags$.MODULE$.contains$extension(publishing.publish().flags(), ControlPacketFlags$.MODULE$.QoSAtMostOnceDelivery())) {
                            _12.success(Producer$ForwardPubRec$.MODULE$.apply(publishing.publishData()));
                            timerScheduler.cancel(str);
                            return publishAcknowledged(publishing, materializer);
                        }
                    }
                    if (Producer$ReceivePubAckRecTimeout$.MODULE$.equals(event) || Producer$ReceiveConnect$.MODULE$.equals(event)) {
                        return QueueOfferState$.MODULE$.waitForQueueOfferCompleted(publishing.remote().offer(Producer$ForwardPublish$.MODULE$.apply(publishing.publish().copy(ControlPacketFlags$.MODULE$.$bar$extension(publishing.publish().flags(), ControlPacketFlags$.MODULE$.DUP()), publishing.publish().copy$default$2(), publishing.publish().copy$default$3(), publishing.publish().copy$default$4()), Some$.MODULE$.apply(new PacketId(publishing.packetId())))), r4 -> {
                            return Producer$QueueOfferCompleted$.MODULE$.apply(r4.toEither());
                        }, publishUnacknowledged(publishing, materializer), package$.MODULE$.Vector().empty());
                    }
                }
                return Behaviors$.MODULE$.same();
            }).receiveSignal(new Producer$$anon$3(publishing));
        });
    }

    public Behavior<Producer.Event> publishAcknowledged(Producer.Publishing publishing, Materializer materializer) {
        String str = "producer-receive-pubrel";
        return Behaviors$.MODULE$.withTimers(timerScheduler -> {
            if (publishing.settings().producerPubCompTimeout().toNanos() > 0) {
                timerScheduler.startSingleTimer(str, Producer$ReceivePubCompTimeout$.MODULE$, publishing.settings().producerPubCompTimeout());
            }
            return Behaviors$.MODULE$.setup(actorContext -> {
                return QueueOfferState$.MODULE$.waitForQueueOfferCompleted(publishing.remote().offer(Producer$ForwardPubRel$.MODULE$.apply(publishing.publish(), publishing.packetId())), r4 -> {
                    return Producer$QueueOfferCompleted$.MODULE$.apply(r4.toEither());
                }, Behaviors$.MODULE$.receiveMessagePartial(new Producer$$anon$4(publishing, materializer)).receiveSignal(new Producer$$anon$5(publishing)), package$.MODULE$.Vector().empty());
            });
        });
    }

    public final void org$apache$pekko$stream$connectors$mqtt$streaming$impl$Producer$$$_$requestPacketId$1(Producer.Start start, ActorContext actorContext) {
        Promise<LocalPacketRouter.Registered> apply = Promise$.MODULE$.apply();
        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(start.packetRouter()), LocalPacketRouter$Register$.MODULE$.apply(actorContext.self().unsafeUpcast(), apply));
        apply.future().onComplete(r7 -> {
            LocalPacketRouter.Registered registered;
            if ((r7 instanceof Success) && (registered = (LocalPacketRouter.Registered) ((Success) r7).value()) != null) {
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.self()), Producer$AcquiredPacketId$.MODULE$.apply(registered.packetId()));
            } else {
                if (!(r7 instanceof Failure)) {
                    throw new MatchError(r7);
                }
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.self()), Producer$UnacquiredPacketId$.MODULE$);
            }
        }, actorContext.executionContext());
    }

    public static final /* synthetic */ Producer.Event org$apache$pekko$stream$connectors$mqtt$streaming$impl$Producer$$anon$1$$_$applyOrElse$$anonfun$1(Try r3) {
        return Producer$QueueOfferCompleted$.MODULE$.apply(r3.toEither());
    }
}
