package org.alcaudon.core;

import akka.actor.ActorPath;
import akka.actor.ActorRef;
import akka.actor.package$;
import akka.persistence.DeleteMessagesFailure;
import akka.persistence.DeleteMessagesSuccess;
import akka.persistence.SaveSnapshotFailure;
import akka.persistence.SaveSnapshotSuccess;
import org.alcaudon.clustering.DataflowTopologyListener;
import org.alcaudon.core.AlcaudonStream;
import scala.Function1;
import scala.MatchError;
import scala.Option$;
import scala.Serializable;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.mutable.ArrayBuffer;
import scala.concurrent.duration.package;
import scala.runtime.AbstractPartialFunction;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.util.Failure;
import scala.util.Success;
import scala.util.Try;

/* compiled from: AlcaudonStream.scala */
/* loaded from: input_file:org/alcaudon/core/AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1.class */
public final class AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 extends AbstractPartialFunction<Object, BoxedUnit> implements Serializable {
    public static final long serialVersionUID = 0;
    private final /* synthetic */ AlcaudonStream $outer;
    private final Set overwhelmedSubscribers$2;

    /* JADX WARN: Multi-variable type inference failed */
    public final <A1, B1> B1 applyOrElse(A1 a1, Function1<A1, B1> function1) {
        Object apply;
        if (a1 instanceof RawRecord) {
            ActorRef sender = this.$outer.sender();
            this.$outer.persist((AlcaudonStream) new RawStreamRecord(this.$outer.state().nextRecordSeq(), (RawRecord) a1), (Function1<AlcaudonStream, BoxedUnit>) rawStreamRecord -> {
                $anonfun$applyOrElse$1(this, sender, rawStreamRecord);
                return BoxedUnit.UNIT;
            });
            apply = BoxedUnit.UNIT;
        } else if (a1 instanceof DataflowTopologyListener.DataflowNodeAddress) {
            DataflowTopologyListener.DataflowNodeAddress dataflowNodeAddress = (DataflowTopologyListener.DataflowNodeAddress) a1;
            String id = dataflowNodeAddress.id();
            ActorPath address = dataflowNodeAddress.address();
            this.$outer.org$alcaudon$core$AlcaudonStream$$subscribers.get(id).foreach(keyExtractor -> {
                $anonfun$applyOrElse$2(this, address, keyExtractor);
                return BoxedUnit.UNIT;
            });
            apply = BoxedUnit.UNIT;
        } else if (a1 instanceof AlcaudonStream.ACK) {
            this.$outer.persist((AlcaudonStream) a1, (Function1<AlcaudonStream, BoxedUnit>) ack -> {
                $anonfun$applyOrElse$4(this, ack);
                return BoxedUnit.UNIT;
            });
            apply = BoxedUnit.UNIT;
        } else if (AlcaudonStream$CheckOverwhelmedSubscribers$.MODULE$.equals(a1)) {
            ArrayBuffer arrayBuffer = (ArrayBuffer) this.$outer.state().subscribers().filter(subscriberInfo -> {
                return BoxesRunTime.boxToBoolean($anonfun$applyOrElse$5(this, subscriberInfo));
            });
            this.$outer.log().info("Overwhelmed {}", arrayBuffer);
            this.$outer.context().become(this.$outer.receiveCommandWithControlFlow(arrayBuffer.toSet()));
            apply = BoxedUnit.UNIT;
        } else if (AlcaudonStream$SignalOverwhelmedSubscribers$.MODULE$.equals(a1)) {
            this.$outer.log().info("Signaling overwhelmed subscribers {}", this.overwhelmedSubscribers$2);
            this.overwhelmedSubscribers$2.flatMap(subscriberInfo2 -> {
                return Option$.MODULE$.option2Iterable(this.$outer.state().getRecord(subscriberInfo2.actor()).map(streamRecord -> {
                    $anonfun$applyOrElse$7(this, subscriberInfo2, streamRecord);
                    return BoxedUnit.UNIT;
                }));
            }, Set$.MODULE$.canBuildFrom());
            apply = BoxedUnit.UNIT;
        } else if (AlcaudonStream$GetSize$.MODULE$.equals(a1)) {
            package$.MODULE$.actorRef2Scala(this.$outer.sender()).$bang(new AlcaudonStream.Size(this.$outer.state().pendingRecords().length()), this.$outer.self());
            apply = BoxedUnit.UNIT;
        } else if (a1 instanceof AlcaudonStream.Subscribe) {
            AlcaudonStream.Subscribe subscribe = (AlcaudonStream.Subscribe) a1;
            ActorRef actor = subscribe.actor();
            KeyExtractor extractor = subscribe.extractor();
            this.$outer.log().info("{} is subscribing to {} ", actor, this.$outer.org$alcaudon$core$AlcaudonStream$$name);
            this.$outer.persist((AlcaudonStream) subscribe, (Function1<AlcaudonStream, BoxedUnit>) subscribe2 -> {
                $anonfun$applyOrElse$8(this, actor, extractor, subscribe2);
                return BoxedUnit.UNIT;
            });
            apply = BoxedUnit.UNIT;
        } else if (a1 instanceof SaveSnapshotSuccess) {
            this.$outer.deleteMessages(((SaveSnapshotSuccess) a1).metadata().sequenceNr());
            this.$outer.state().gc();
            apply = BoxedUnit.UNIT;
        } else if (a1 instanceof SaveSnapshotFailure) {
            this.$outer.log().error("Error saving the snapshot {}", (SaveSnapshotFailure) a1);
            apply = BoxedUnit.UNIT;
        } else if (a1 instanceof DeleteMessagesSuccess) {
            this.$outer.log().info("Garbage collection on stream {} worked correctly {}", this.$outer.org$alcaudon$core$AlcaudonStream$$name, this.$outer.state().pendingRecords());
            apply = BoxedUnit.UNIT;
        } else if (a1 instanceof DeleteMessagesFailure) {
            this.$outer.log().error("Garbage collection on stream {} failed", this.$outer.org$alcaudon$core$AlcaudonStream$$name);
            apply = BoxedUnit.UNIT;
        } else {
            if (AlcaudonStream$InjectFailure$.MODULE$.equals(a1)) {
                throw new Exception("injected failure");
            }
            apply = function1.apply(a1);
        }
        return (B1) apply;
    }

    public final boolean isDefinedAt(Object obj) {
        return obj instanceof RawRecord ? true : obj instanceof DataflowTopologyListener.DataflowNodeAddress ? true : obj instanceof AlcaudonStream.ACK ? true : AlcaudonStream$CheckOverwhelmedSubscribers$.MODULE$.equals(obj) ? true : AlcaudonStream$SignalOverwhelmedSubscribers$.MODULE$.equals(obj) ? true : AlcaudonStream$GetSize$.MODULE$.equals(obj) ? true : obj instanceof AlcaudonStream.Subscribe ? true : obj instanceof SaveSnapshotSuccess ? true : obj instanceof SaveSnapshotFailure ? true : obj instanceof DeleteMessagesSuccess ? true : obj instanceof DeleteMessagesFailure ? true : AlcaudonStream$InjectFailure$.MODULE$.equals(obj);
    }

    public static final /* synthetic */ void $anonfun$applyOrElse$1(AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, ActorRef actorRef, RawStreamRecord rawStreamRecord) {
        alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.state().update(rawStreamRecord);
        package$.MODULE$.actorRef2Scala(actorRef).$bang(new AlcaudonStream.ReceiveACK(rawStreamRecord.rawRecord().id()), alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.self());
        alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.signalSubscribers(alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.overwhelmedSubscribers$2);
        if (alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.shouldTakeSnapshot()) {
            alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.saveSnapshot(alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.state());
        }
    }

    public static final /* synthetic */ void $anonfun$applyOrElse$3(AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, KeyExtractor keyExtractor, Try r6) {
        if (r6 instanceof Success) {
            alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.state().addSubscriber((ActorRef) ((Success) r6).value(), keyExtractor);
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
        } else {
            if (!(r6 instanceof Failure)) {
                throw new MatchError(r6);
            }
            alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.log().error("Error getting subscriber {}", ((Failure) r6).exception());
            BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
        }
    }

    public static final /* synthetic */ void $anonfun$applyOrElse$2(AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, ActorPath actorPath, KeyExtractor keyExtractor) {
        alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.context().actorSelection(actorPath).resolveOne(new package.DurationInt(scala.concurrent.duration.package$.MODULE$.DurationInt(2)).seconds()).onComplete(r6 -> {
            $anonfun$applyOrElse$3(alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, keyExtractor, r6);
            return BoxedUnit.UNIT;
        }, alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.context().dispatcher());
    }

    public static final /* synthetic */ void $anonfun$applyOrElse$4(AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, AlcaudonStream.ACK ack) {
        alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.state().ack(ack.actor(), ack.offset());
    }

    public static final /* synthetic */ boolean $anonfun$applyOrElse$5(AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, SubscriberInfo subscriberInfo) {
        return subscriberInfo.isOverwhelmed(alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.state().latestRecordSeq(), alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.overwhelmDelayedMessages());
    }

    public static final /* synthetic */ void $anonfun$applyOrElse$7(AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, SubscriberInfo subscriberInfo, StreamRecord streamRecord) {
        package$.MODULE$.actorRef2Scala(subscriberInfo.actor()).$bang(streamRecord, alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.self());
    }

    public static final /* synthetic */ void $anonfun$applyOrElse$8(AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1 alcaudonStream$$anonfun$receiveCommandWithControlFlow$1, ActorRef actorRef, KeyExtractor keyExtractor, AlcaudonStream.Subscribe subscribe) {
        alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.state().addSubscriber(subscribe.actor(), keyExtractor);
        alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.log().info("{} is subscribed to {} ", actorRef, alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.org$alcaudon$core$AlcaudonStream$$name);
        package$.MODULE$.actorRef2Scala(subscribe.actor()).$bang(new AlcaudonStream.SuccessfulSubscription(alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.org$alcaudon$core$AlcaudonStream$$name, alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.state().latestRecordSeq()), alcaudonStream$$anonfun$receiveCommandWithControlFlow$1.$outer.self());
    }

    public AlcaudonStream$$anonfun$receiveCommandWithControlFlow$1(AlcaudonStream alcaudonStream, Set set) {
        if (alcaudonStream == null) {
            throw null;
        }
        this.$outer = alcaudonStream;
        this.overwhelmedSubscribers$2 = set;
    }
}
