package akka.stream.alpakka.mqtt.streaming.impl;

import akka.NotUsed;
import akka.actor.typed.ActorRef;
import akka.actor.typed.ActorRef$;
import akka.actor.typed.ActorRef$ActorRefOps$;
import akka.actor.typed.Behavior;
import akka.actor.typed.scaladsl.ActorContext;
import akka.actor.typed.scaladsl.Behaviors$;
import akka.annotation.InternalApi;
import akka.stream.Materializer;
import akka.stream.OverflowStrategy$;
import akka.stream.alpakka.mqtt.streaming.ControlPacketFlags$;
import akka.stream.alpakka.mqtt.streaming.MqttSessionSettings;
import akka.stream.alpakka.mqtt.streaming.PacketId;
import akka.stream.alpakka.mqtt.streaming.Publish;
import akka.stream.alpakka.mqtt.streaming.impl.LocalPacketRouter;
import akka.stream.alpakka.mqtt.streaming.impl.Producer;
import akka.stream.scaladsl.BroadcastHub$;
import akka.stream.scaladsl.Keep$;
import akka.stream.scaladsl.Source;
import akka.stream.scaladsl.Source$;
import akka.stream.scaladsl.SourceQueueWithComplete;
import scala.MatchError;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.package$;
import scala.runtime.BoxedUnit;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: RequestState.scala */
@InternalApi
/* loaded from: input_file:akka/stream/alpakka/mqtt/streaming/impl/Producer$.class */
public final class Producer$ {
    public static final Producer$ MODULE$ = new Producer$();

    public Behavior<Producer.Event> apply(Publish publish, Option<?> option, Promise<Source<Producer.ForwardPublishingCommand, NotUsed>> promise, ActorRef<LocalPacketRouter.Request<Producer.Event>> actorRef, MqttSessionSettings mqttSessionSettings, Materializer materializer) {
        return preparePublish(new Producer.Start(publish, option, promise, actorRef, mqttSessionSettings), materializer);
    }

    public Behavior<Producer.Event> preparePublish(Producer.Start start, Materializer materializer) {
        return Behaviors$.MODULE$.setup(actorContext -> {
            akka$stream$alpakka$mqtt$streaming$impl$Producer$$requestPacketId$1(start, actorContext);
            Tuple2 tuple2 = (Tuple2) Source$.MODULE$.queue(start.settings().clientSendBufferSize(), OverflowStrategy$.MODULE$.backpressure()).toMat(BroadcastHub$.MODULE$.sink(), Keep$.MODULE$.both()).run(materializer);
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            Tuple2 tuple22 = new Tuple2((SourceQueueWithComplete) tuple2._1(), (Source) tuple2._2());
            SourceQueueWithComplete sourceQueueWithComplete = (SourceQueueWithComplete) tuple22._1();
            start.remote().success((Source) tuple22._2());
            return Behaviors$.MODULE$.receiveMessagePartial(new Producer$$anonfun$$nestedInanonfun$preparePublish$1$1(sourceQueueWithComplete, start, materializer, actorContext)).receiveSignal(new Producer$$anonfun$$nestedInanonfun$preparePublish$1$2(sourceQueueWithComplete));
        });
    }

    public Behavior<Producer.Event> publishUnacknowledged(Producer.Publishing publishing, Materializer materializer) {
        String str = "producer-receive-pubackrec";
        return Behaviors$.MODULE$.withTimers(timerScheduler -> {
            if (publishing.settings().producerPubAckRecTimeout().toNanos() > 0) {
                timerScheduler.startSingleTimer(str, Producer$ReceivePubAckRecTimeout$.MODULE$, publishing.settings().producerPubAckRecTimeout());
            }
            return Behaviors$.MODULE$.receive((actorContext, event) -> {
                Behavior<Producer.Event> same;
                Tuple2 tuple2 = new Tuple2(actorContext, event);
                if (tuple2 != null) {
                    Producer.Event event = (Producer.Event) tuple2._2();
                    if (event instanceof Producer.PubAckReceivedFromRemote) {
                        Promise<Producer.ForwardPubAck> local = ((Producer.PubAckReceivedFromRemote) event).local();
                        if (ControlPacketFlags$.MODULE$.contains$extension(publishing.publish().flags(), ControlPacketFlags$.MODULE$.QoSAtLeastOnceDelivery())) {
                            local.success(new Producer.ForwardPubAck(publishing.publishData()));
                            same = Behaviors$.MODULE$.stopped();
                            return same;
                        }
                    }
                }
                if (tuple2 != null) {
                    Producer.Event event2 = (Producer.Event) tuple2._2();
                    if (event2 instanceof Producer.PubRecReceivedFromRemote) {
                        Promise<Producer.ForwardPubRec> local2 = ((Producer.PubRecReceivedFromRemote) event2).local();
                        if (ControlPacketFlags$.MODULE$.contains$extension(publishing.publish().flags(), ControlPacketFlags$.MODULE$.QoSAtMostOnceDelivery())) {
                            local2.success(new Producer.ForwardPubRec(publishing.publishData()));
                            timerScheduler.cancel(str);
                            same = MODULE$.publishAcknowledged(publishing, materializer);
                            return same;
                        }
                    }
                }
                if (tuple2 != null) {
                    Producer.Event event3 = (Producer.Event) tuple2._2();
                    if (Producer$ReceivePubAckRecTimeout$.MODULE$.equals(event3) ? true : Producer$ReceiveConnect$.MODULE$.equals(event3)) {
                        same = QueueOfferState$.MODULE$.waitForQueueOfferCompleted(publishing.remote().offer(new Producer.ForwardPublish(publishing.publish().copy(ControlPacketFlags$.MODULE$.$bar$extension(publishing.publish().flags(), ControlPacketFlags$.MODULE$.DUP()), publishing.publish().copy$default$2(), publishing.publish().copy$default$3(), publishing.publish().copy$default$4()), new Some(new PacketId(publishing.packetId())))), r4 -> {
                            return new Producer.QueueOfferCompleted(r4.toEither());
                        }, MODULE$.publishUnacknowledged(publishing, materializer), package$.MODULE$.Vector().empty());
                        return same;
                    }
                }
                same = Behaviors$.MODULE$.same();
                return same;
            }).receiveSignal(new Producer$$anonfun$$nestedInanonfun$publishUnacknowledged$1$1(publishing));
        });
    }

    public Behavior<Producer.Event> publishAcknowledged(Producer.Publishing publishing, Materializer materializer) {
        String str = "producer-receive-pubrel";
        return Behaviors$.MODULE$.withTimers(timerScheduler -> {
            if (publishing.settings().producerPubCompTimeout().toNanos() > 0) {
                timerScheduler.startSingleTimer(str, Producer$ReceivePubCompTimeout$.MODULE$, publishing.settings().producerPubCompTimeout());
            }
            return Behaviors$.MODULE$.setup(actorContext -> {
                return QueueOfferState$.MODULE$.waitForQueueOfferCompleted(publishing.remote().offer(new Producer.ForwardPubRel(publishing.publish(), publishing.packetId())), r4 -> {
                    return new Producer.QueueOfferCompleted(r4.toEither());
                }, Behaviors$.MODULE$.receiveMessagePartial(new Producer$$anonfun$$nestedInanonfun$publishAcknowledged$2$1(publishing, materializer)).receiveSignal(new Producer$$anonfun$$nestedInanonfun$publishAcknowledged$2$2(publishing)), package$.MODULE$.Vector().empty());
            });
        });
    }

    public static final /* synthetic */ void $anonfun$preparePublish$2(ActorContext actorContext, Try r7) {
        LocalPacketRouter.Registered registered;
        if ((r7 instanceof Success) && (registered = (LocalPacketRouter.Registered) ((Success) r7).value()) != null) {
            ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.self()), new Producer.AcquiredPacketId(registered.packetId()));
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r7 instanceof Failure)) {
                throw new MatchError(r7);
            }
            ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(actorContext.self()), Producer$UnacquiredPacketId$.MODULE$);
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final void akka$stream$alpakka$mqtt$streaming$impl$Producer$$requestPacketId$1(Producer.Start start, ActorContext actorContext) {
        Promise apply = Promise$.MODULE$.apply();
        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(start.packetRouter()), new LocalPacketRouter.Register(actorContext.self().unsafeUpcast(), apply));
        apply.future().onComplete(r4 -> {
            $anonfun$preparePublish$2(actorContext, r4);
            return BoxedUnit.UNIT;
        }, actorContext.executionContext());
    }

    private Producer$() {
    }
}
