package org.alcaudon.runtime;

import akka.actor.Actor;
import akka.actor.ActorContext;
import akka.actor.ActorLogging;
import akka.actor.ActorRef;
import akka.actor.SupervisorStrategy;
import akka.actor.package$;
import akka.event.LoggingAdapter;
import java.time.Duration;
import org.alcaudon.clustering.DataflowTopologyListener;
import org.alcaudon.clustering.DataflowTopologyListener$;
import org.alcaudon.core.ActorConfig;
import org.alcaudon.core.KeyExtractor;
import org.alcaudon.core.RawRecord;
import org.alcaudon.core.RawStreamRecord;
import org.alcaudon.core.Record;
import org.alcaudon.core.SettingsDefinition;
import org.alcaudon.core.StreamRecord;
import org.alcaudon.core.sources.SourceCtx;
import org.alcaudon.core.sources.SourceFunc;
import scala.MatchError;
import scala.Option;
import scala.PartialFunction;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.Map;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;

/* compiled from: SourceReifier.scala */
@ScalaSignature(bytes = "\u0006\u0001\u0005-a\u0001B\u0001\u0003\u0001%\u0011QbU8ve\u000e,'+Z5gS\u0016\u0014(BA\u0002\u0005\u0003\u001d\u0011XO\u001c;j[\u0016T!!\u0002\u0004\u0002\u0011\u0005d7-Y;e_:T\u0011aB\u0001\u0004_J<7\u0001A\n\u0007\u0001)\u0001\u0002DH\u0011\u0011\u0005-qQ\"\u0001\u0007\u000b\u00035\tQa]2bY\u0006L!a\u0004\u0007\u0003\r\u0005s\u0017PU3g!\t\tb#D\u0001\u0013\u0015\t\u0019B#A\u0003bGR|'OC\u0001\u0016\u0003\u0011\t7n[1\n\u0005]\u0011\"!B!di>\u0014\bCA\r\u001d\u001b\u0005Q\"BA\u000e\u0005\u0003\u0011\u0019wN]3\n\u0005uQ\"aC!di>\u00148i\u001c8gS\u001e\u0004\"!E\u0010\n\u0005\u0001\u0012\"\u0001D!di>\u0014Hj\\4hS:<\u0007C\u0001\u0012&\u001b\u0005\u0019#B\u0001\u0013\u001b\u0003\u001d\u0019x.\u001e:dKNL!AJ\u0012\u0003\u0013M{WO]2f\u0007RD\b\u0002\u0003\u0015\u0001\u0005\u0003\u0005\u000b\u0011B\u0015\u0002\u0015\u0011\fG/\u00194m_^LE\r\u0005\u0002+c9\u00111f\f\t\u0003Y1i\u0011!\f\u0006\u0003]!\ta\u0001\u0010:p_Rt\u0014B\u0001\u0019\r\u0003\u0019\u0001&/\u001a3fM&\u0011!g\r\u0002\u0007'R\u0014\u0018N\\4\u000b\u0005Ab\u0001\u0002C\u001b\u0001\u0005\u0003\u0005\u000b\u0011B\u0015\u0002\t9\fW.\u001a\u0005\to\u0001\u0011\t\u0011)A\u0005q\u0005A1o\\;sG\u00164e\u000e\u0005\u0002#s%\u0011!h\t\u0002\u000b'>,(oY3Gk:\u001c\u0007\u0002\u0003\u001f\u0001\u0005\u0003\u0005\u000b\u0011B\u001f\u0002\u0017M,(m]2sS\n,'o\u001d\t\u0005UyJ\u0003)\u0003\u0002@g\t\u0019Q*\u00199\u0011\u0005e\t\u0015B\u0001\"\u001b\u00051YU-_#yiJ\f7\r^8s\u0011\u0015!\u0005\u0001\"\u0001F\u0003\u0019a\u0014N\\5u}Q)a\tS%K\u0017B\u0011q\tA\u0007\u0002\u0005!)\u0001f\u0011a\u0001S!)Qg\u0011a\u0001S!)qg\u0011a\u0001q!9Ah\u0011I\u0001\u0002\u0004i\u0004bB'\u0001\u0001\u0004%\tAT\u0001\u000fgV\u00147o\u0019:jE\u0016\u0014(+\u001a4t+\u0005y\u0005\u0003\u0002\u0016?!\u0002\u0003\"!E)\n\u0005I\u0013\"\u0001C!di>\u0014(+\u001a4\t\u000fQ\u0003\u0001\u0019!C\u0001+\u0006\u00112/\u001e2tGJL'-\u001a:SK\u001a\u001cx\fJ3r)\t1\u0016\f\u0005\u0002\f/&\u0011\u0001\f\u0004\u0002\u0005+:LG\u000fC\u0004['\u0006\u0005\t\u0019A(\u0002\u0007a$\u0013\u0007\u0003\u0004]\u0001\u0001\u0006KaT\u0001\u0010gV\u00147o\u0019:jE\u0016\u0014(+\u001a4tA!)a\f\u0001C\u0001?\u000691m\u001c7mK\u000e$HC\u0001,a\u0011\u0015\tW\f1\u0001c\u0003\u0019\u0011XmY8sIB\u0011\u0011dY\u0005\u0003Ij\u0011\u0011BU1x%\u0016\u001cwN\u001d3\t\u000b\u0019\u0004A\u0011A4\u0002\u000b\rdwn]3\u0016\u0003YCQ!\u001b\u0001\u0005\u0002)\fqA]3dK&4X-F\u0001l!\u0011YAN\u001c,\n\u00055d!a\u0004)beRL\u0017\r\u001c$v]\u000e$\u0018n\u001c8\u0011\u0005-y\u0017B\u00019\r\u0005\r\te._\u0004\be\n\t\t\u0011#\u0001t\u00035\u0019v.\u001e:dKJ+\u0017NZ5feB\u0011q\t\u001e\u0004\b\u0003\t\t\t\u0011#\u0001v'\t!(\u0002C\u0003Ei\u0012\u0005q\u000fF\u0001t\u0011\u001dIH/%A\u0005\u0002i\f1\u0004\n7fgNLg.\u001b;%OJ,\u0017\r^3sI\u0011,g-Y;mi\u0012\"T#A>+\u0005ub8&A?\u0011\u0007y\f9!D\u0001��\u0015\u0011\t\t!a\u0001\u0002\u0013Ut7\r[3dW\u0016$'bAA\u0003\u0019\u0005Q\u0011M\u001c8pi\u0006$\u0018n\u001c8\n\u0007\u0005%qPA\tv]\u000eDWmY6fIZ\u000b'/[1oG\u0016\u0004")
/* loaded from: input_file:org/alcaudon/runtime/SourceReifier.class */
public class SourceReifier implements Actor, ActorConfig, ActorLogging, SourceCtx {
    public final Map<String, KeyExtractor> org$alcaudon$runtime$SourceReifier$$subscribers;
    private Map<ActorRef, KeyExtractor> subscriberRefs;
    private LoggingAdapter akka$actor$ActorLogging$$_log;
    private final SettingsDefinition config;
    private final ActorContext context;
    private final ActorRef self;

    public LoggingAdapter log() {
        return ActorLogging.log$(this);
    }

    @Override // org.alcaudon.core.ActorConfig
    public FiniteDuration asFiniteDuration(Duration duration) {
        FiniteDuration asFiniteDuration;
        asFiniteDuration = asFiniteDuration(duration);
        return asFiniteDuration;
    }

    public final ActorRef sender() {
        return Actor.sender$(this);
    }

    public void aroundReceive(PartialFunction<Object, BoxedUnit> partialFunction, Object obj) {
        Actor.aroundReceive$(this, partialFunction, obj);
    }

    public void aroundPreStart() {
        Actor.aroundPreStart$(this);
    }

    public void aroundPostStop() {
        Actor.aroundPostStop$(this);
    }

    public void aroundPreRestart(Throwable th, Option<Object> option) {
        Actor.aroundPreRestart$(this, th, option);
    }

    public void aroundPostRestart(Throwable th) {
        Actor.aroundPostRestart$(this, th);
    }

    public SupervisorStrategy supervisorStrategy() {
        return Actor.supervisorStrategy$(this);
    }

    public void preStart() throws Exception {
        Actor.preStart$(this);
    }

    public void postStop() throws Exception {
        Actor.postStop$(this);
    }

    public void preRestart(Throwable th, Option<Object> option) throws Exception {
        Actor.preRestart$(this, th, option);
    }

    public void postRestart(Throwable th) throws Exception {
        Actor.postRestart$(this, th);
    }

    public void unhandled(Object obj) {
        Actor.unhandled$(this, obj);
    }

    public LoggingAdapter akka$actor$ActorLogging$$_log() {
        return this.akka$actor$ActorLogging$$_log;
    }

    public void akka$actor$ActorLogging$$_log_$eq(LoggingAdapter loggingAdapter) {
        this.akka$actor$ActorLogging$$_log = loggingAdapter;
    }

    @Override // org.alcaudon.core.ActorConfig
    public SettingsDefinition config() {
        return this.config;
    }

    @Override // org.alcaudon.core.ActorConfig
    public void org$alcaudon$core$ActorConfig$_setter_$config_$eq(SettingsDefinition settingsDefinition) {
        this.config = settingsDefinition;
    }

    public ActorContext context() {
        return this.context;
    }

    public final ActorRef self() {
        return this.self;
    }

    public void akka$actor$Actor$_setter_$context_$eq(ActorContext actorContext) {
        this.context = actorContext;
    }

    public final void akka$actor$Actor$_setter_$self_$eq(ActorRef actorRef) {
        this.self = actorRef;
    }

    public Map<ActorRef, KeyExtractor> subscriberRefs() {
        return this.subscriberRefs;
    }

    public void subscriberRefs_$eq(Map<ActorRef, KeyExtractor> map) {
        this.subscriberRefs = map;
    }

    @Override // org.alcaudon.core.sources.SourceCtx
    public void collect(RawRecord rawRecord) {
        subscriberRefs().foreach(tuple2 -> {
            $anonfun$collect$1(this, rawRecord, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    @Override // org.alcaudon.core.sources.SourceCtx
    public void close() {
    }

    public PartialFunction<Object, BoxedUnit> receive() {
        return new SourceReifier$$anonfun$receive$1(this);
    }

    public static final /* synthetic */ void $anonfun$collect$1(SourceReifier sourceReifier, RawRecord rawRecord, Tuple2 tuple2) {
        if (tuple2 == null) {
            throw new MatchError(tuple2);
        }
        package$.MODULE$.actorRef2Scala((ActorRef) tuple2._1()).$bang(new StreamRecord(new RawStreamRecord(0L, rawRecord), new Record(((KeyExtractor) tuple2._2()).extractKey(rawRecord.value()), rawRecord)), sourceReifier.self());
        BoxedUnit boxedUnit = BoxedUnit.UNIT;
    }

    public SourceReifier(String str, String str2, SourceFunc sourceFunc, Map<String, KeyExtractor> map) {
        this.org$alcaudon$runtime$SourceReifier$$subscribers = map;
        Actor.$init$(this);
        org$alcaudon$core$ActorConfig$_setter_$config_$eq(new SettingsDefinition(context().system().settings().config()));
        ActorLogging.$init$(this);
        if (config().computation().distributed()) {
            package$.MODULE$.actorRef2Scala(context().actorOf(DataflowTopologyListener$.MODULE$.props(str, str2))).$bang(new DataflowTopologyListener.DownstreamDependencies(map.keySet()), self());
        }
        this.subscriberRefs = Predef$.MODULE$.Map().empty();
    }
}
