/*
 * Decompiled with CFR 0.152.
 */
package org.apache.reef.runtime.common.utils;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.runtime.common.utils.BroadCastEventHandler;
import org.apache.reef.tang.util.MonotonicHashMap;
import org.apache.reef.util.ExceptionHandlingEventHandler;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.ThreadPoolStage;

@Private
@DriverSide
public final class DispatchingEStage
implements AutoCloseable {
    private final Map<Class<?>, EventHandler<?>> handlers = Collections.synchronizedMap(new MonotonicHashMap());
    private final EventHandler<Throwable> errorHandler;
    private final ThreadPoolStage<DelayedOnNext> stage;

    public DispatchingEStage(EventHandler<Throwable> errorHandler, int numThreads, String stageName) {
        this.errorHandler = errorHandler;
        this.stage = new ThreadPoolStage(stageName, (EventHandler)new EventHandler<DelayedOnNext>(){

            public void onNext(DelayedOnNext promise) {
                promise.handler.onNext(promise.message);
            }
        }, numThreads);
    }

    public DispatchingEStage(DispatchingEStage other) {
        this.errorHandler = other.errorHandler;
        this.stage = other.stage;
    }

    public <T, U extends T> void register(Class<T> type, Set<EventHandler<U>> handlers) {
        this.handlers.put(type, new ExceptionHandlingEventHandler(new BroadCastEventHandler(handlers), this.errorHandler));
    }

    public <T, U extends T> void onNext(Class<T> type, U message) {
        EventHandler<?> handler = this.handlers.get(type);
        this.stage.onNext((Object)new DelayedOnNext(handler, message));
    }

    public boolean isEmpty() {
        return this.stage.getQueueLength() == 0;
    }

    @Override
    public void close() throws Exception {
        this.stage.close();
    }

    private static final class DelayedOnNext {
        public final EventHandler<Object> handler;
        public final Object message;

        public <T, U extends T> DelayedOnNext(EventHandler<T> handler, U message) {
            this.handler = handler;
            this.message = message;
        }
    }
}

