/*
 * Decompiled with CFR 0.152.
 */
package net.jokubasdargis.rxbus;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.jokubasdargis.rxbus.Bus;
import net.jokubasdargis.rxbus.Dispatcher;
import net.jokubasdargis.rxbus.ErrorListener;
import net.jokubasdargis.rxbus.Flusher;
import net.jokubasdargis.rxbus.Queue;
import net.jokubasdargis.rxbus.Station;
import rx.Scheduler;
import rx.Subscriber;
import rx.Subscription;

final class DefaultDispatcher
implements Dispatcher {
    private final Map<Station<?>, Subscription> subscriptions = new ConcurrentHashMap(8);
    private final Bus bus;
    private final Scheduler busScheduler;
    private final Flusher flusher;
    private final ErrorListener errorListener;

    public static Dispatcher create(Bus bus, Scheduler scheduler, Flusher flusher, ErrorListener errorListener) {
        return new DefaultDispatcher(bus, scheduler, flusher, errorListener);
    }

    @Override
    public <T> void publish(Queue<T> queue, T event) {
        this.bus.publish(queue, event);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public <T> void register(Queue<T> queue, Station<T> station) {
        Map<Station<?>, Subscription> map = this.subscriptions;
        synchronized (map) {
            if (!this.subscriptions.containsKey(station)) {
                this.subscriptions.put(station, this.bus.subscribe(queue, new StationSubscriber<T>(station, this.flusher, this.errorListener), this.busScheduler));
            }
        }
    }

    @Override
    public <T> void unregister(Station<T> station) {
        Subscription subscription = this.subscriptions.remove(station);
        if (subscription != null) {
            subscription.unsubscribe();
        }
    }

    private DefaultDispatcher(Bus bus, Scheduler busScheduler, Flusher flusher, ErrorListener errorListener) {
        this.bus = bus;
        this.busScheduler = busScheduler;
        this.flusher = flusher;
        this.errorListener = errorListener;
    }

    private static class StationSubscriber<T>
    extends Subscriber<T> {
        private final Station<T> station;
        private final Flusher flusher;
        private final ErrorListener errorListener;

        StationSubscriber(Station<T> station, Flusher flusher, ErrorListener errorListener) {
            this.station = station;
            this.flusher = flusher;
            this.errorListener = errorListener;
        }

        public void onCompleted() {
        }

        public void onError(Throwable e) {
        }

        public void onNext(T t) {
            try {
                this.station.receive(t);
            }
            catch (Throwable throwable) {
                this.errorListener.onError(throwable);
            }
            finally {
                this.flusher.schedule(this.station);
            }
        }

        public String toString() {
            return "StationSubscriber{station=" + this.station + '}';
        }
    }
}

