package org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl;

import java.util.concurrent.atomic.AtomicLong;
import org.apache.pekko.Done;
import org.apache.pekko.Done$;
import org.apache.pekko.NotUsed;
import org.apache.pekko.NotUsed$;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.ClassicActorSystemProvider;
import org.apache.pekko.actor.typed.ActorRef;
import org.apache.pekko.actor.typed.ActorRef$;
import org.apache.pekko.actor.typed.ActorRef$ActorRefOps$;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.scaladsl.adapter.PropsAdapter$;
import org.apache.pekko.actor.typed.scaladsl.adapter.package$;
import org.apache.pekko.actor.typed.scaladsl.adapter.package$ClassicActorRefOps$;
import org.apache.pekko.actor.typed.scaladsl.adapter.package$TypedActorRefOps$;
import org.apache.pekko.event.LogSource$;
import org.apache.pekko.event.Logging$;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.stream.ActorAttributes$;
import org.apache.pekko.stream.KillSwitches$;
import org.apache.pekko.stream.Materializer$;
import org.apache.pekko.stream.SharedKillSwitch;
import org.apache.pekko.stream.Supervision;
import org.apache.pekko.stream.Supervision$Resume$;
import org.apache.pekko.stream.Supervision$Stop$;
import org.apache.pekko.stream.WatchedActorTerminatedException;
import org.apache.pekko.stream.connectors.mqtt.streaming.Command;
import org.apache.pekko.stream.connectors.mqtt.streaming.Command$;
import org.apache.pekko.stream.connectors.mqtt.streaming.ConnAck;
import org.apache.pekko.stream.connectors.mqtt.streaming.Connect;
import org.apache.pekko.stream.connectors.mqtt.streaming.ControlPacket;
import org.apache.pekko.stream.connectors.mqtt.streaming.Disconnect$;
import org.apache.pekko.stream.connectors.mqtt.streaming.Event;
import org.apache.pekko.stream.connectors.mqtt.streaming.Event$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttByteIterator$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttConnect$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttDisconnect$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttPingReq$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttPubAck$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttPubComp$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttPubRec$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttPubRel$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttPublish$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttSubscribe$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttCodec$MqttUnsubscribe$;
import org.apache.pekko.stream.connectors.mqtt.streaming.MqttSessionSettings;
import org.apache.pekko.stream.connectors.mqtt.streaming.PingReq$;
import org.apache.pekko.stream.connectors.mqtt.streaming.PingResp$;
import org.apache.pekko.stream.connectors.mqtt.streaming.PubAck;
import org.apache.pekko.stream.connectors.mqtt.streaming.PubComp;
import org.apache.pekko.stream.connectors.mqtt.streaming.PubRec;
import org.apache.pekko.stream.connectors.mqtt.streaming.PubRel;
import org.apache.pekko.stream.connectors.mqtt.streaming.PubRel$;
import org.apache.pekko.stream.connectors.mqtt.streaming.Publish;
import org.apache.pekko.stream.connectors.mqtt.streaming.SubAck;
import org.apache.pekko.stream.connectors.mqtt.streaming.Subscribe;
import org.apache.pekko.stream.connectors.mqtt.streaming.UnsubAck;
import org.apache.pekko.stream.connectors.mqtt.streaming.Unsubscribe;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ConnAckReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ConnectReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ConnectionLost$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$DisconnectReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ForwardConnAck$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ForwardConnect$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ForwardDisconnect$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ForwardPingReq$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ForwardPingResp$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ForwardPubRel$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$ForwardPublish$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$PingRespReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$PublishReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$PublishReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$SubscribeReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.ClientConnector$UnsubscribeReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$ForwardPubAck$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$ForwardPubComp$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$ForwardPubRec$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$ForwardPubRel$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$ForwardPublish$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$PubAckReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$PubCompReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$PubRecReceivedLocally$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Consumer$PubRelReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.LocalPacketRouter;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.LocalPacketRouter$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.LocalPacketRouter$Route$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.MqttFrameStage;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer$ForwardPubAck$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer$ForwardPubComp$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer$ForwardPubRec$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer$PubAckReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer$PubCompReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Producer$PubRecReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.RemotePacketRouter;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.RemotePacketRouter$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.RemotePacketRouter$Route$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Subscriber;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Subscriber$ForwardSubAck$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Subscriber$SubAckReceivedFromRemote$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Unsubscriber;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Unsubscriber$ForwardUnsubAck$;
import org.apache.pekko.stream.connectors.mqtt.streaming.impl.Unsubscriber$UnsubAckReceivedFromRemote$;
import org.apache.pekko.stream.scaladsl.Flow;
import org.apache.pekko.stream.scaladsl.Flow$;
import org.apache.pekko.stream.scaladsl.Source;
import org.apache.pekko.stream.scaladsl.Source$;
import org.apache.pekko.util.ByteString;
import org.apache.pekko.util.ByteString$;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Tuple2;
import scala.Tuple2$;
import scala.concurrent.Future;
import scala.concurrent.Future$;
import scala.concurrent.Promise;
import scala.concurrent.Promise$;
import scala.util.Either;
import scala.util.Failure;
import scala.util.Left;
import scala.util.Right;
import scala.util.Success;

/* compiled from: MqttSession.scala */
/* loaded from: input_file:org/apache/pekko/stream/connectors/mqtt/streaming/scaladsl/ActorMqttClientSession.class */
public final class ActorMqttClientSession extends MqttClientSession {
    private final MqttSessionSettings settings;
    private final ClassicActorSystemProvider systemProvider;
    private final ActorSystem system;
    private final ActorRef<Object> consumerPacketRouter;
    private final ActorRef<Object> producerPacketRouter;
    private final ActorRef<Object> subscriberPacketRouter;
    private final ActorRef<Object> unsubscriberPacketRouter;
    private final ActorRef<Object> clientConnector;
    private final LoggingAdapter loggingAdapter;
    private final long clientSessionId = ActorMqttClientSession$.MODULE$.clientSessionCounter().getAndIncrement();
    private final ByteString pingReqBytes = MqttCodec$MqttPingReq$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttPingReq(PingReq$.MODULE$), ByteString$.MODULE$.newBuilder()).result();

    public static ActorMqttClientSession apply(MqttSessionSettings mqttSessionSettings, ClassicActorSystemProvider classicActorSystemProvider) {
        return ActorMqttClientSession$.MODULE$.apply(mqttSessionSettings, classicActorSystemProvider);
    }

    public static AtomicLong clientSessionCounter() {
        return ActorMqttClientSession$.MODULE$.clientSessionCounter();
    }

    public ActorMqttClientSession(MqttSessionSettings mqttSessionSettings, ClassicActorSystemProvider classicActorSystemProvider) {
        this.settings = mqttSessionSettings;
        this.systemProvider = classicActorSystemProvider;
        this.system = classicActorSystemProvider.classicSystem();
        this.consumerPacketRouter = package$ClassicActorRefOps$.MODULE$.toTyped$extension(package$.MODULE$.ClassicActorRefOps(this.system.systemActorOf(PropsAdapter$.MODULE$.apply(ActorMqttClientSession::$init$$$anonfun$1, PropsAdapter$.MODULE$.apply$default$2()), new StringBuilder(36).append("client-consumer-packet-id-allocator-").append(this.clientSessionId).toString())));
        this.producerPacketRouter = package$ClassicActorRefOps$.MODULE$.toTyped$extension(package$.MODULE$.ClassicActorRefOps(this.system.systemActorOf(PropsAdapter$.MODULE$.apply(ActorMqttClientSession::$init$$$anonfun$2, PropsAdapter$.MODULE$.apply$default$2()), new StringBuilder(36).append("client-producer-packet-id-allocator-").append(this.clientSessionId).toString())));
        this.subscriberPacketRouter = package$ClassicActorRefOps$.MODULE$.toTyped$extension(package$.MODULE$.ClassicActorRefOps(this.system.systemActorOf(PropsAdapter$.MODULE$.apply(ActorMqttClientSession::$init$$$anonfun$3, PropsAdapter$.MODULE$.apply$default$2()), new StringBuilder(38).append("client-subscriber-packet-id-allocator-").append(this.clientSessionId).toString())));
        this.unsubscriberPacketRouter = package$ClassicActorRefOps$.MODULE$.toTyped$extension(package$.MODULE$.ClassicActorRefOps(this.system.systemActorOf(PropsAdapter$.MODULE$.apply(ActorMqttClientSession::$init$$$anonfun$4, PropsAdapter$.MODULE$.apply$default$2()), new StringBuilder(40).append("client-unsubscriber-packet-id-allocator-").append(this.clientSessionId).toString())));
        this.clientConnector = package$ClassicActorRefOps$.MODULE$.toTyped$extension(package$.MODULE$.ClassicActorRefOps(this.system.systemActorOf(PropsAdapter$.MODULE$.apply(() -> {
            return r5.$init$$$anonfun$5(r6, r7);
        }, PropsAdapter$.MODULE$.apply$default$2()), new StringBuilder(17).append("client-connector-").append(this.clientSessionId).toString())));
        this.loggingAdapter = Logging$.MODULE$.apply(this.system, ActorMqttClientSession.class, LogSource$.MODULE$.fromAnyClass());
    }

    @Override // org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.MqttSession
    public <A> void $bang(Command<A> command) {
        if (command != null) {
            Command<A> unapply = Command$.MODULE$.unapply(command);
            ControlPacket _1 = unapply._1();
            unapply._2();
            Option<A> _3 = unapply._3();
            if (_1 instanceof Publish) {
                Publish publish = (Publish) _1;
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$PublishReceivedLocally$.MODULE$.apply(publish, _3));
                return;
            }
        }
        if (!(command instanceof Command)) {
            throw new MatchError(command);
        }
        throw new IllegalStateException(new StringBuilder(50).append(command).append(" is not a client command that can be sent directly").toString());
    }

    @Override // org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.MqttSession
    public void shutdown() {
        this.system.stop(package$TypedActorRefOps$.MODULE$.toClassic$extension(package$.MODULE$.TypedActorRefOps(this.clientConnector)));
        this.system.stop(package$TypedActorRefOps$.MODULE$.toClassic$extension(package$.MODULE$.TypedActorRefOps(this.consumerPacketRouter)));
        this.system.stop(package$TypedActorRefOps$.MODULE$.toClassic$extension(package$.MODULE$.TypedActorRefOps(this.producerPacketRouter)));
        this.system.stop(package$TypedActorRefOps$.MODULE$.toClassic$extension(package$.MODULE$.TypedActorRefOps(this.subscriberPacketRouter)));
        this.system.stop(package$TypedActorRefOps$.MODULE$.toClassic$extension(package$.MODULE$.TypedActorRefOps(this.unsubscriberPacketRouter)));
    }

    @Override // org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.MqttClientSession
    public <A> Flow<Command<A>, ByteString, NotUsed> commandFlow(ByteString byteString) {
        return Flow$.MODULE$.lazyFutureFlow(() -> {
            SharedKillSwitch shared = KillSwitches$.MODULE$.shared(new StringBuilder(20).append("command-kill-switch-").append(this.clientSessionId).toString());
            Future$ future$ = Future$.MODULE$;
            Flow log = Flow$.MODULE$.apply().watch(package$TypedActorRefOps$.MODULE$.toClassic$extension(package$.MODULE$.TypedActorRefOps(this.clientConnector))).watchTermination((notUsed, future) -> {
                Tuple2 apply = Tuple2$.MODULE$.apply(notUsed, future);
                if (apply == null) {
                    throw new MatchError(apply);
                }
                ((Future) apply._2()).onComplete(r7 -> {
                    if ((r7 instanceof Failure) && (((Failure) r7).exception() instanceof WatchedActorTerminatedException)) {
                        return;
                    }
                    ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$ConnectionLost$.MODULE$.apply(byteString));
                }, this.system.dispatcher());
                return NotUsed$.MODULE$;
            }).via(shared.flow()).flatMapMerge(this.settings.commandParallelism(), command -> {
                if (command != null) {
                    Command unapply = Command$.MODULE$.unapply(command);
                    ControlPacket _1 = unapply._1();
                    Option<Promise<Done>> _2 = unapply._2();
                    Option<?> _3 = unapply._3();
                    if (_1 instanceof Connect) {
                        Connect connect = (Connect) _1;
                        Promise<Source<ClientConnector.ForwardConnectCommand, NotUsed>> apply = Promise$.MODULE$.apply();
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$ConnectReceivedLocally$.MODULE$.apply(byteString, connect, _3, apply));
                        return Source$.MODULE$.futureSource(apply.future().map(source -> {
                            return source.map(forwardConnectCommand -> {
                                if (ClientConnector$ForwardConnect$.MODULE$.equals(forwardConnectCommand)) {
                                    return MqttCodec$MqttConnect$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttConnect(connect), ByteString$.MODULE$.newBuilder()).result();
                                }
                                if (ClientConnector$ForwardPingReq$.MODULE$.equals(forwardConnectCommand)) {
                                    return this.pingReqBytes;
                                }
                                if (forwardConnectCommand instanceof ClientConnector.ForwardPublish) {
                                    ClientConnector.ForwardPublish unapply2 = ClientConnector$ForwardPublish$.MODULE$.unapply((ClientConnector.ForwardPublish) forwardConnectCommand);
                                    Publish _12 = unapply2._1();
                                    return MqttCodec$MqttPublish$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttPublish(_12), ByteString$.MODULE$.newBuilder(), unapply2._2()).result();
                                }
                                if (!(forwardConnectCommand instanceof ClientConnector.ForwardPubRel)) {
                                    throw new MatchError(forwardConnectCommand);
                                }
                                return MqttCodec$MqttPubRel$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttPubRel(PubRel$.MODULE$.apply(ClientConnector$ForwardPubRel$.MODULE$.unapply((ClientConnector.ForwardPubRel) forwardConnectCommand)._1())), ByteString$.MODULE$.newBuilder()).result();
                            }).mapError(new ActorMqttClientSession$$anon$1()).watchTermination((notUsed2, future2) -> {
                                future2.onComplete(r5 -> {
                                    if (r5 instanceof Success) {
                                        shared.shutdown();
                                    } else {
                                        if (!(r5 instanceof Failure)) {
                                            throw new MatchError(r5);
                                        }
                                        shared.abort(((Failure) r5).exception());
                                    }
                                }, this.system.dispatcher());
                            });
                        }, this.system.dispatcher()));
                    }
                    if (_1 instanceof PubAck) {
                        PubAck pubAck = (PubAck) _1;
                        Promise<Consumer$ForwardPubAck$> apply2 = Promise$.MODULE$.apply();
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.consumerPacketRouter), RemotePacketRouter$Route$.MODULE$.apply(None$.MODULE$, pubAck.packetId(), Consumer$PubAckReceivedLocally$.MODULE$.apply(apply2), apply2));
                        apply2.future().onComplete(r4 -> {
                            _2.foreach(promise -> {
                                return promise.complete(r4.map(consumer$ForwardPubAck$ -> {
                                    return Done$.MODULE$;
                                }));
                            });
                        }, this.system.dispatcher());
                        return Source$.MODULE$.future(apply2.future().map(consumer$ForwardPubAck$ -> {
                            return MqttCodec$MqttPubAck$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttPubAck(pubAck), ByteString$.MODULE$.newBuilder()).result();
                        }, this.system.dispatcher())).recover(new ActorMqttClientSession$$anon$2());
                    }
                    if (_1 instanceof PubRec) {
                        PubRec pubRec = (PubRec) _1;
                        Promise<Consumer$ForwardPubRec$> apply3 = Promise$.MODULE$.apply();
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.consumerPacketRouter), RemotePacketRouter$Route$.MODULE$.apply(None$.MODULE$, pubRec.packetId(), Consumer$PubRecReceivedLocally$.MODULE$.apply(apply3), apply3));
                        apply3.future().onComplete(r42 -> {
                            _2.foreach(promise -> {
                                return promise.complete(r42.map(consumer$ForwardPubRec$ -> {
                                    return Done$.MODULE$;
                                }));
                            });
                        }, this.system.dispatcher());
                        return Source$.MODULE$.future(apply3.future().map(consumer$ForwardPubRec$ -> {
                            return MqttCodec$MqttPubRec$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttPubRec(pubRec), ByteString$.MODULE$.newBuilder()).result();
                        }, this.system.dispatcher())).recover(new ActorMqttClientSession$$anon$3());
                    }
                    if (_1 instanceof PubComp) {
                        PubComp pubComp = (PubComp) _1;
                        Promise<Consumer$ForwardPubComp$> apply4 = Promise$.MODULE$.apply();
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.consumerPacketRouter), RemotePacketRouter$Route$.MODULE$.apply(None$.MODULE$, pubComp.packetId(), Consumer$PubCompReceivedLocally$.MODULE$.apply(apply4), apply4));
                        apply4.future().onComplete(r43 -> {
                            _2.foreach(promise -> {
                                return promise.complete(r43.map(consumer$ForwardPubComp$ -> {
                                    return Done$.MODULE$;
                                }));
                            });
                        }, this.system.dispatcher());
                        return Source$.MODULE$.future(apply4.future().map(consumer$ForwardPubComp$ -> {
                            return MqttCodec$MqttPubComp$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttPubComp(pubComp), ByteString$.MODULE$.newBuilder()).result();
                        }, this.system.dispatcher())).recover(new ActorMqttClientSession$$anon$4());
                    }
                    if (_1 instanceof Subscribe) {
                        Subscribe subscribe = (Subscribe) _1;
                        Promise<Subscriber.ForwardSubscribe> apply5 = Promise$.MODULE$.apply();
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$SubscribeReceivedLocally$.MODULE$.apply(byteString, subscribe, _3, apply5));
                        return Source$.MODULE$.future(apply5.future().map(forwardSubscribe -> {
                            return MqttCodec$MqttSubscribe$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttSubscribe(subscribe), ByteString$.MODULE$.newBuilder(), forwardSubscribe.packetId()).result();
                        }, this.system.dispatcher()));
                    }
                    if (_1 instanceof Unsubscribe) {
                        Unsubscribe unsubscribe = (Unsubscribe) _1;
                        Promise<Unsubscriber.ForwardUnsubscribe> apply6 = Promise$.MODULE$.apply();
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$UnsubscribeReceivedLocally$.MODULE$.apply(byteString, unsubscribe, _3, apply6));
                        return Source$.MODULE$.future(apply6.future().map(forwardUnsubscribe -> {
                            return MqttCodec$MqttUnsubscribe$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttUnsubscribe(unsubscribe), ByteString$.MODULE$.newBuilder(), forwardUnsubscribe.packetId()).result();
                        }, this.system.dispatcher()));
                    }
                    if (_1 == Disconnect$.MODULE$) {
                        Disconnect$ disconnect$ = (Disconnect$) _1;
                        Promise<ClientConnector$ForwardDisconnect$> apply7 = Promise$.MODULE$.apply();
                        ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$DisconnectReceivedLocally$.MODULE$.apply(byteString, apply7));
                        return Source$.MODULE$.future(apply7.future().map(clientConnector$ForwardDisconnect$ -> {
                            return MqttCodec$MqttDisconnect$.MODULE$.encode$extension(MqttCodec$.MODULE$.MqttDisconnect(disconnect$), ByteString$.MODULE$.newBuilder()).result();
                        }, this.system.dispatcher()));
                    }
                }
                if (command instanceof Command) {
                    throw new IllegalStateException(new StringBuilder(24).append(command).append(" is not a client command").toString());
                }
                throw new MatchError(command);
            }).recover(new ActorMqttClientSession$$anon$5()).filter(byteString2 -> {
                return byteString2.nonEmpty();
            }).log("client-commandFlow", byteString3 -> {
                return MqttCodec$MqttByteIterator$.MODULE$.decodeControlPacket$extension(MqttCodec$.MODULE$.MqttByteIterator(byteString3.iterator()), this.settings.maxPacketSize());
            }, this.loggingAdapter);
            int DebugLevel = Logging$.MODULE$.DebugLevel();
            return future$.successful(log.withAttributes(ActorAttributes$.MODULE$.logLevels(ActorAttributes$.MODULE$.logLevels$default$1(), ActorAttributes$.MODULE$.logLevels$default$2(), DebugLevel)));
        }).mapMaterializedValue(future -> {
            return NotUsed$.MODULE$;
        });
    }

    @Override // org.apache.pekko.stream.connectors.mqtt.streaming.scaladsl.MqttClientSession
    public <A> Flow<ByteString, Either<MqttCodec.DecodeError, Event<A>>, NotUsed> eventFlow(ByteString byteString) {
        Flow map = Flow$.MODULE$.apply().watch(package$TypedActorRefOps$.MODULE$.toClassic$extension(package$.MODULE$.TypedActorRefOps(this.clientConnector))).watchTermination((notUsed, future) -> {
            Tuple2 apply = Tuple2$.MODULE$.apply(notUsed, future);
            if (apply == null) {
                throw new MatchError(apply);
            }
            ((Future) apply._2()).onComplete(r7 -> {
                if ((r7 instanceof Failure) && (((Failure) r7).exception() instanceof WatchedActorTerminatedException)) {
                    return;
                }
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$ConnectionLost$.MODULE$.apply(byteString));
            }, this.system.dispatcher());
            return NotUsed$.MODULE$;
        }).via(new MqttFrameStage(this.settings.maxPacketSize())).map(byteString2 -> {
            return MqttCodec$MqttByteIterator$.MODULE$.decodeControlPacket$extension(MqttCodec$.MODULE$.MqttByteIterator(byteString2.iterator()), this.settings.maxPacketSize());
        });
        Flow recoverWithRetries = map.log("client-events", map.log$default$2(), this.loggingAdapter).mapAsync(this.settings.eventParallelism(), either -> {
            if (!(either instanceof Right)) {
                if (!(either instanceof Left)) {
                    throw new MatchError(either);
                }
                return Future$.MODULE$.successful(scala.package$.MODULE$.Left().apply((MqttCodec.DecodeError) ((Left) either).value()));
            }
            ControlPacket controlPacket = (ControlPacket) ((Right) either).value();
            if (controlPacket instanceof ConnAck) {
                ConnAck connAck = (ConnAck) controlPacket;
                Promise<ClientConnector.ForwardConnAck> apply = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$ConnAckReceivedFromRemote$.MODULE$.apply(byteString, connAck, apply));
                return apply.future().map(forwardConnAck -> {
                    if (forwardConnAck != null) {
                        Option<?> _1 = ClientConnector$ForwardConnAck$.MODULE$.unapply(forwardConnAck)._1();
                        if (_1 instanceof Option) {
                            return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply((ControlPacket) connAck, (Option) _1));
                        }
                    }
                    throw new MatchError(forwardConnAck);
                }, this.system.dispatcher());
            }
            if (controlPacket instanceof SubAck) {
                SubAck subAck = (SubAck) controlPacket;
                Promise<Subscriber.ForwardSubAck> apply2 = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.subscriberPacketRouter), LocalPacketRouter$Route$.MODULE$.apply(subAck.packetId(), Subscriber$SubAckReceivedFromRemote$.MODULE$.apply(apply2), apply2));
                return apply2.future().map(forwardSubAck -> {
                    if (forwardSubAck != null) {
                        Option<?> _1 = Subscriber$ForwardSubAck$.MODULE$.unapply(forwardSubAck)._1();
                        if (_1 instanceof Option) {
                            return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply((ControlPacket) subAck, (Option) _1));
                        }
                    }
                    throw new MatchError(forwardSubAck);
                }, this.system.dispatcher());
            }
            if (controlPacket instanceof UnsubAck) {
                UnsubAck unsubAck = (UnsubAck) controlPacket;
                Promise<Unsubscriber.ForwardUnsubAck> apply3 = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.unsubscriberPacketRouter), LocalPacketRouter$Route$.MODULE$.apply(unsubAck.packetId(), Unsubscriber$UnsubAckReceivedFromRemote$.MODULE$.apply(apply3), apply3));
                return apply3.future().map(forwardUnsubAck -> {
                    if (forwardUnsubAck != null) {
                        Option<?> _1 = Unsubscriber$ForwardUnsubAck$.MODULE$.unapply(forwardUnsubAck)._1();
                        if (_1 instanceof Option) {
                            return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply((ControlPacket) unsubAck, (Option) _1));
                        }
                    }
                    throw new MatchError(forwardUnsubAck);
                }, this.system.dispatcher());
            }
            if (controlPacket instanceof Publish) {
                Publish publish = (Publish) controlPacket;
                Promise<Consumer$ForwardPublish$> apply4 = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$PublishReceivedFromRemote$.MODULE$.apply(byteString, publish, apply4));
                return apply4.future().map(consumer$ForwardPublish$ -> {
                    return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply(publish));
                }, this.system.dispatcher());
            }
            if (controlPacket instanceof PubAck) {
                PubAck pubAck = (PubAck) controlPacket;
                Promise<Producer.ForwardPubAck> apply5 = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.producerPacketRouter), LocalPacketRouter$Route$.MODULE$.apply(pubAck.packetId(), Producer$PubAckReceivedFromRemote$.MODULE$.apply(apply5), apply5));
                return apply5.future().map(forwardPubAck -> {
                    if (forwardPubAck != null) {
                        Option<?> _1 = Producer$ForwardPubAck$.MODULE$.unapply(forwardPubAck)._1();
                        if (_1 instanceof Option) {
                            return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply((ControlPacket) pubAck, (Option) _1));
                        }
                    }
                    throw new MatchError(forwardPubAck);
                }, this.system.dispatcher());
            }
            if (controlPacket instanceof PubRec) {
                PubRec pubRec = (PubRec) controlPacket;
                Promise<Producer.ForwardPubRec> apply6 = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.producerPacketRouter), LocalPacketRouter$Route$.MODULE$.apply(pubRec.packetId(), Producer$PubRecReceivedFromRemote$.MODULE$.apply(apply6), apply6));
                return apply6.future().map(forwardPubRec -> {
                    if (forwardPubRec != null) {
                        Option<?> _1 = Producer$ForwardPubRec$.MODULE$.unapply(forwardPubRec)._1();
                        if (_1 instanceof Option) {
                            return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply((ControlPacket) pubRec, (Option) _1));
                        }
                    }
                    throw new MatchError(forwardPubRec);
                }, this.system.dispatcher());
            }
            if (controlPacket instanceof PubRel) {
                PubRel pubRel = (PubRel) controlPacket;
                Promise<Consumer$ForwardPubRel$> apply7 = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.consumerPacketRouter), RemotePacketRouter$Route$.MODULE$.apply(None$.MODULE$, pubRel.packetId(), Consumer$PubRelReceivedFromRemote$.MODULE$.apply(apply7), apply7));
                return apply7.future().map(consumer$ForwardPubRel$ -> {
                    return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply(pubRel));
                }, this.system.dispatcher());
            }
            if (controlPacket instanceof PubComp) {
                PubComp pubComp = (PubComp) controlPacket;
                Promise<Producer.ForwardPubComp> apply8 = Promise$.MODULE$.apply();
                ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.producerPacketRouter), LocalPacketRouter$Route$.MODULE$.apply(pubComp.packetId(), Producer$PubCompReceivedFromRemote$.MODULE$.apply(apply8), apply8));
                return apply8.future().map(forwardPubComp -> {
                    if (forwardPubComp != null) {
                        Option<?> _1 = Producer$ForwardPubComp$.MODULE$.unapply(forwardPubComp)._1();
                        if (_1 instanceof Option) {
                            return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply((ControlPacket) pubComp, (Option) _1));
                        }
                    }
                    throw new MatchError(forwardPubComp);
                }, this.system.dispatcher());
            }
            if (!PingResp$.MODULE$.equals(controlPacket)) {
                return Future$.MODULE$.failed(new IllegalStateException(new StringBuilder(22).append(controlPacket).append(" is not a client event").toString()));
            }
            Promise<ClientConnector$ForwardPingResp$> apply9 = Promise$.MODULE$.apply();
            ActorRef$ActorRefOps$.MODULE$.$bang$extension(ActorRef$.MODULE$.ActorRefOps(this.clientConnector), ClientConnector$PingRespReceivedFromRemote$.MODULE$.apply(byteString, apply9));
            return apply9.future().map(clientConnector$ForwardPingResp$ -> {
                return scala.package$.MODULE$.Right().apply(Event$.MODULE$.apply(PingResp$.MODULE$));
            }, this.system.dispatcher());
        }).withAttributes(ActorAttributes$.MODULE$.supervisionStrategy(th -> {
            return (Supervision.Directive) (((th instanceof LocalPacketRouter.CannotRoute) || (th instanceof RemotePacketRouter.CannotRoute)) ? Supervision$Resume$.MODULE$ : Supervision$Stop$.MODULE$);
        })).recoverWithRetries(-1, new ActorMqttClientSession$$anon$6());
        int DebugLevel = Logging$.MODULE$.DebugLevel();
        return recoverWithRetries.withAttributes(ActorAttributes$.MODULE$.logLevels(ActorAttributes$.MODULE$.logLevels$default$1(), ActorAttributes$.MODULE$.logLevels$default$2(), DebugLevel));
    }

    private static final Behavior $init$$$anonfun$1() {
        return RemotePacketRouter$.MODULE$.apply();
    }

    private static final Behavior $init$$$anonfun$2() {
        return LocalPacketRouter$.MODULE$.apply();
    }

    private static final Behavior $init$$$anonfun$3() {
        return LocalPacketRouter$.MODULE$.apply();
    }

    private static final Behavior $init$$$anonfun$4() {
        return LocalPacketRouter$.MODULE$.apply();
    }

    /* JADX WARN: Multi-variable type inference failed */
    private final Behavior $init$$$anonfun$5(MqttSessionSettings mqttSessionSettings, ClassicActorSystemProvider classicActorSystemProvider) {
        return ClientConnector$.MODULE$.apply(this.consumerPacketRouter, this.producerPacketRouter, this.subscriberPacketRouter, this.unsubscriberPacketRouter, mqttSessionSettings, Materializer$.MODULE$.matFromSystem(classicActorSystemProvider));
    }
}
