/*
 * Decompiled with CFR 0.152.
 */
package com.github.davidmoten.fsm.runtime.rx;

import com.github.davidmoten.fsm.runtime.Action3;
import com.github.davidmoten.fsm.runtime.CancelTimedSignal;
import com.github.davidmoten.fsm.runtime.Clock;
import com.github.davidmoten.fsm.runtime.EntityBehaviour;
import com.github.davidmoten.fsm.runtime.EntityState;
import com.github.davidmoten.fsm.runtime.EntityStateMachine;
import com.github.davidmoten.fsm.runtime.Event;
import com.github.davidmoten.fsm.runtime.ObjectState;
import com.github.davidmoten.fsm.runtime.Search;
import com.github.davidmoten.fsm.runtime.Signal;
import com.github.davidmoten.fsm.runtime.rx.ClassId;
import com.github.davidmoten.fsm.runtime.rx.ClassIdPair;
import com.github.davidmoten.guavamini.Preconditions;
import io.reactivex.rxjava3.core.BackpressureStrategy;
import io.reactivex.rxjava3.core.Emitter;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.core.FlowableTransformer;
import io.reactivex.rxjava3.core.Scheduler;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.flowables.GroupedFlowable;
import io.reactivex.rxjava3.functions.BiConsumer;
import io.reactivex.rxjava3.functions.Consumer;
import io.reactivex.rxjava3.functions.Function;
import io.reactivex.rxjava3.functions.Supplier;
import io.reactivex.rxjava3.internal.functions.Functions;
import io.reactivex.rxjava3.schedulers.Schedulers;
import io.reactivex.rxjava3.subjects.PublishSubject;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;

public final class Processor<Id> {
    private final Function<Class<?>, EntityBehaviour<?, Id>> behaviourFactory;
    private final PublishSubject<Signal<?, Id>> subject;
    private final Scheduler signalScheduler;
    private final Scheduler processingScheduler;
    private final Map<ClassId<?, Id>, EntityStateMachine<?, Id>> stateMachines = new ConcurrentHashMap();
    private final Map<ClassIdPair<Id>, Disposable> subscriptions = new ConcurrentHashMap<ClassIdPair<Id>, Disposable>();
    private final Flowable<Signal<?, Id>> signals;
    private final Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> entityTransform;
    private final FlowableTransformer<Signal<?, Id>, Signal<?, Id>> preGroupBy;
    private final Function<Consumer<Object>, Map<ClassId<?, Id>, Object>> mapFactory;
    private final Clock signallerClock;
    private final Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> preTransitionAction;
    private final Consumer<? super EntityStateMachine<?, Id>> postTransitionAction;
    private final Search<Id> search = new Search<Id>(){

        @Override
        public <T> Optional<T> search(Class<T> cls, Id id) {
            return Processor.this.getStateMachine(cls, id).get();
        }
    };

    private Processor(Function<Class<?>, EntityBehaviour<?, Id>> behaviourFactory, Scheduler processingScheduler, Scheduler signalScheduler, Flowable<Signal<?, Id>> signals, Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> entityTransform, FlowableTransformer<Signal<?, Id>, Signal<?, Id>> preGroupBy, Function<Consumer<Object>, Map<ClassId<?, Id>, Object>> mapFactory, Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> preTransitionAction, Consumer<? super EntityStateMachine<?, Id>> postTransitionAction) {
        Preconditions.checkNotNull(behaviourFactory);
        Preconditions.checkNotNull((Object)signalScheduler);
        Preconditions.checkNotNull(signals);
        Preconditions.checkNotNull(entityTransform);
        Preconditions.checkNotNull(preGroupBy);
        Preconditions.checkNotNull(preTransitionAction);
        Preconditions.checkNotNull(postTransitionAction);
        this.behaviourFactory = behaviourFactory;
        this.signalScheduler = signalScheduler;
        this.processingScheduler = processingScheduler;
        this.subject = PublishSubject.create();
        this.signals = signals;
        this.entityTransform = entityTransform;
        this.preGroupBy = preGroupBy;
        this.mapFactory = mapFactory;
        this.signallerClock = Clock.from(signalScheduler);
        this.preTransitionAction = preTransitionAction;
        this.postTransitionAction = postTransitionAction;
    }

    public static <Id> Builder<Id> behaviourFactory(Function<Class<?>, EntityBehaviour<?, Id>> behaviourFactory) {
        return new Builder<Id>().behaviourFactory(behaviourFactory);
    }

    public static <T, Id> Builder<Id> behaviour(Class<T> cls, EntityBehaviour<T, Id> behaviour) {
        return new Builder<Id>().behaviour(cls, behaviour);
    }

    public static <Id> Builder<Id> signalScheduler(Scheduler signalScheduler) {
        return new Builder().signalScheduler(signalScheduler);
    }

    public static <Id> Builder<Id> processingScheduler(Scheduler processingScheduler) {
        return new Builder().processingScheduler(processingScheduler);
    }

    public Flowable<EntityStateMachine<?, Id>> flowable() {
        return Flowable.defer(() -> {
            Scheduler.Worker worker = this.signalScheduler.createWorker();
            Flowable a = this.subject.toSerialized().toFlowable(BackpressureStrategy.BUFFER).mergeWith(this.signals).doOnCancel(() -> worker.dispose()).compose(this.preGroupBy);
            Flowable b = this.mapFactory != null ? a.groupBy(signal -> new ClassId(signal.cls(), signal.id()), x -> x, true, Integer.MAX_VALUE, this.mapFactory) : a.groupBy(signal -> new ClassId(signal.cls(), signal.id()), Functions.identity());
            return b.flatMap(g -> {
                Flowable obs = g.observeOn(this.processingScheduler).flatMap(this.processSignalsToSelfAndSendSignalsToOthers(worker, (ClassId)g.getKey())).doOnNext(m -> this.stateMachines.put((ClassId<?, Id>)g.getKey(), (EntityStateMachine<?, Id>)m));
                return (Publisher)this.entityTransform.apply(Processor.grouped(g.getKey(), obs));
            });
        });
    }

    private static <K, T> GroupedFlowable<K, T> grouped(K key, final Flowable<T> o) {
        return new GroupedFlowable<K, T>(key){

            protected void subscribeActual(Subscriber<? super T> s) {
                o.subscribe(s);
            }
        };
    }

    private Function<? super Signal<?, Id>, Flowable<EntityStateMachine<?, Id>>> processSignalsToSelfAndSendSignalsToOthers(Scheduler.Worker worker, ClassId<?, Id> classId) {
        return signal -> this.process(classId, signal.event(), worker).toList().toFlowable().flatMapIterable(Functions.identity());
    }

    private Flowable<EntityStateMachine<?, Id>> process(ClassId<?, Id> classId, Event<?> event, Scheduler.Worker worker) {
        EntityStateMachine<?, Id> machine = this.getStateMachine(classId.cls(), classId.id());
        TransitionHandler handler = new TransitionHandler(classId, event, worker, machine);
        return Flowable.generate((Supplier)handler, (BiConsumer)handler);
    }

    private <T> EntityStateMachine<T, Id> getStateMachine(Class<T> cls, Id id) {
        return this.stateMachines.computeIfAbsent(new ClassId<T, Id>(cls, id), clsId -> {
            try {
                return ((EntityBehaviour)this.behaviourFactory.apply((Object)cls)).create(id).withSearch(this.search).withClock(this.signallerClock).withPreTransition(this.preTransitionAction);
            }
            catch (Throwable e) {
                return (EntityStateMachine)Processor.rethrow(e);
            }
        });
    }

    public <T> Optional<T> getObject(Class<T> cls, Id id) {
        try {
            return this.getStateMachine(cls, id).get();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void signal(Signal<?, Id> signal) {
        this.subject.onNext(signal);
    }

    public <T> void signal(Class<T> cls, Id id, Event<? super T> event) {
        this.subject.onNext(Signal.create(cls, id, event));
    }

    public <T> void signal(ClassId<T, Id> cid, Event<? super T> event) {
        this.signal(cid.cls(), cid.id(), event);
    }

    public <T> ObjectState<T> get(Class<T> cls, Id id) {
        return this.stateMachines.get(new ClassId<T, Id>(cls, id));
    }

    public void onCompleted() {
        this.subject.onComplete();
    }

    public void cancelSignal(Class<?> fromClass, Id fromId, Class<?> toClass, Id toId) {
        Disposable subscription = this.subscriptions.remove(new ClassIdPair<Id>(new ClassId(fromClass, fromId), new ClassId(toClass, toId)));
        if (subscription != null) {
            subscription.dispose();
        }
    }

    public void cancelSignalToSelf(Class<?> cls, Id id) {
        this.cancelSignal(cls, id, cls, id);
    }

    public void cancelSignalToSelf(ClassId<?, Id> cid) {
        this.cancelSignalToSelf(cid.cls(), cid.id());
    }

    private static <T> T rethrow(Throwable t) {
        if (t instanceof RuntimeException) {
            throw (RuntimeException)t;
        }
        if (t instanceof Error) {
            throw (Error)t;
        }
        if (t instanceof IOException) {
            throw new UncheckedIOException((IOException)t);
        }
        throw new RuntimeException(t);
    }

    private final class TransitionHandler
    implements Supplier<Signals<Id>>,
    BiConsumer<Signals<Id>, Emitter<EntityStateMachine<?, Id>>> {
        private final Event<?> event;
        private final ClassId<?, Id> classId;
        private final Scheduler.Worker worker;
        EntityStateMachine<?, Id> machine;

        TransitionHandler(ClassId<?, Id> classId, Event<?> event, Scheduler.Worker worker, EntityStateMachine<?, Id> machine) {
            this.classId = classId;
            this.event = event;
            this.worker = worker;
            this.machine = machine;
        }

        public Signals<Id> get() throws Exception {
            Signals signals = new Signals();
            signals.signalsToSelf.offerFirst(this.event);
            return signals;
        }

        public void accept(Signals<Id> signals, Emitter<EntityStateMachine<?, Id>> observer) throws Throwable {
            Event<?> event = signals.signalsToSelf.pollLast();
            if (event != null) {
                this.applySignalToSelf(signals, observer, event);
            } else {
                this.applySignalsToOthers(this.classId, this.worker, signals);
                observer.onComplete();
            }
        }

        private <T> void applySignalToSelf(Signals<Id> signals, Emitter<? super EntityStateMachine<?, Id>> observer, Event<T> event) throws Throwable {
            this.machine = this.machine.signal(event);
            Processor.this.postTransitionAction.accept(this.machine);
            observer.onNext(this.machine);
            List<Event<?>> list = this.machine.signalsToSelf();
            for (int i = list.size() - 1; i >= 0; --i) {
                signals.signalsToSelf.offerLast(list.get(i));
            }
            for (Signal<?, ?> signal : this.machine.signalsToOther()) {
                signals.signalsToOther.offerLast(signal);
            }
        }

        private void applySignalsToOthers(ClassId<?, Id> cid, Scheduler.Worker worker, Signals<Id> signals) {
            Signal signal;
            while ((signal = signals.signalsToOther.pollFirst()) != null) {
                Signal s = signal;
                if (signal.isImmediate()) {
                    Processor.this.subject.onNext(signal);
                    continue;
                }
                if (signal.event() instanceof CancelTimedSignal) {
                    this.cancel(signal);
                    continue;
                }
                long delayMs = signal.time().get() - worker.now(TimeUnit.MILLISECONDS);
                if (delayMs <= 0L) {
                    Processor.this.subject.onNext(signal);
                    continue;
                }
                this.scheduleSignal(cid, worker, signal, s, delayMs);
            }
        }

        private void cancel(Signal<?, Id> signal) {
            CancelTimedSignal s = (CancelTimedSignal)signal.event();
            Disposable sub = (Disposable)Processor.this.subscriptions.remove(new ClassIdPair(new ClassId(s.fromClass(), s.fromId()), new ClassId(signal.cls(), signal.id())));
            if (sub != null) {
                sub.dispose();
            }
        }

        private void scheduleSignal(ClassId<?, Id> from, Scheduler.Worker worker, Signal<?, Id> signal, Signal<?, Id> s, long delayMs) {
            ClassIdPair idPair = new ClassIdPair(from, new ClassId(signal.cls(), signal.id()));
            long t1 = Processor.this.signalScheduler.now(TimeUnit.MILLISECONDS);
            Disposable subscription = worker.schedule(() -> Processor.this.subject.onNext(s.now()), delayMs, TimeUnit.MILLISECONDS);
            long t2 = Processor.this.signalScheduler.now(TimeUnit.MILLISECONDS);
            worker.schedule(() -> Processor.this.subscriptions.remove(idPair), delayMs - (t2 - t1), TimeUnit.MILLISECONDS);
            Disposable previous = Processor.this.subscriptions.put(idPair, subscription);
            if (previous != null) {
                previous.dispose();
            }
        }
    }

    private static final class Signals<Id> {
        final Deque<Event<?>> signalsToSelf = new ArrayDeque();
        final Deque<Signal<?, Id>> signalsToOther = new ArrayDeque();

        private Signals() {
        }
    }

    public static class Builder<Id> {
        private Function<Class<?>, EntityBehaviour<?, Id>> behaviourFactory;
        private Scheduler signalScheduler = Schedulers.computation();
        private Scheduler processingScheduler = Schedulers.trampoline();
        private Flowable<Signal<?, Id>> signals = Flowable.empty();
        private Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> entityTransform = g -> g;
        private FlowableTransformer<Signal<?, Id>, Signal<?, Id>> preGroupBy = x -> x;
        private Function<Consumer<Object>, Map<ClassId<?, Id>, Object>> mapFactory;
        private Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> preTransitionAction = (x, y, z) -> {};
        private Consumer<? super EntityStateMachine<?, Id>> postTransitionAction = x -> {};
        private final Map<Class<?>, EntityBehaviour<?, Id>> behaviours = new HashMap();

        private Builder() {
        }

        public <T> Builder<Id> behaviour(Class<T> cls, EntityBehaviour<T, Id> behaviour) {
            this.behaviours.put(cls, behaviour);
            return this;
        }

        public Builder<Id> behaviourFactory(Function<Class<?>, EntityBehaviour<?, Id>> behaviourFactory) {
            this.behaviourFactory = behaviourFactory;
            return this;
        }

        public Builder<Id> signalScheduler(Scheduler signalScheduler) {
            this.signalScheduler = signalScheduler;
            return this;
        }

        public Builder<Id> processingScheduler(Scheduler processingScheduler) {
            this.processingScheduler = processingScheduler;
            return this;
        }

        public Builder<Id> signals(Flowable<Signal<?, Id>> signals) {
            this.signals = signals;
            return this;
        }

        public Builder<Id> entityTransform(Function<GroupedFlowable<ClassId<?, Id>, EntityStateMachine<?, Id>>, Flowable<EntityStateMachine<?, Id>>> entityTransform) {
            this.entityTransform = entityTransform;
            return this;
        }

        public Builder<Id> preGroupBy(FlowableTransformer<Signal<?, Id>, Signal<?, Id>> preGroupBy) {
            this.preGroupBy = preGroupBy;
            return this;
        }

        public Builder<Id> mapFactory(Function<Consumer<Object>, Map<ClassId<?, Id>, Object>> mapFactory) {
            this.mapFactory = mapFactory;
            return this;
        }

        public Builder<Id> preTransition(Action3<? super EntityStateMachine<?, Id>, ? super Event<?>, ? super EntityState<?>> action) {
            this.preTransitionAction = action;
            return this;
        }

        public Builder<Id> postTransition(Consumer<? super EntityStateMachine<?, Id>> action) {
            this.postTransitionAction = action;
            return this;
        }

        public Processor<Id> build() {
            Preconditions.checkArgument((this.behaviourFactory != null || !this.behaviours.isEmpty() ? 1 : 0) != 0, (String)"one of behaviourFactory or multiple calls to behaviour must be made (behaviour must be specified)");
            Preconditions.checkArgument((this.behaviourFactory == null || this.behaviours.isEmpty() ? 1 : 0) != 0, (String)"cannot specify both behaviourFactory and behaviour");
            if (!this.behaviours.isEmpty()) {
                this.behaviourFactory = cls -> this.behaviours.get(cls);
            }
            return new Processor(this.behaviourFactory, this.processingScheduler, this.signalScheduler, this.signals, this.entityTransform, this.preGroupBy, this.mapFactory, this.preTransitionAction, this.postTransitionAction);
        }
    }
}

