package org.apache.reef.wake.remote.impl;

import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.remote.Codec;
import org.apache.reef.wake.remote.RemoteIdentifier;
import org.apache.reef.wake.remote.RemoteMessage;
import org.apache.reef.wake.remote.exception.RemoteRuntimeException;
import org.apache.reef.wake.remote.transport.Transport;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/reef/wake/remote/impl/HandlerContainer.class */
public final class HandlerContainer<T> implements EventHandler<RemoteEvent<byte[]>> {
    private static final Logger LOG = Logger.getLogger(HandlerContainer.class.getName());
    private final ConcurrentMap<Class<? extends T>, EventHandler<RemoteMessage<? extends T>>> msgTypeToHandlerMap = new ConcurrentHashMap();
    private final ConcurrentMap<Tuple2<RemoteIdentifier, Class<? extends T>>, EventHandler<? extends T>> tupleToHandlerMap = new ConcurrentHashMap();
    private final Codec<T> codec;
    private final String name;
    private Transport transport;

    /* JADX INFO: Access modifiers changed from: package-private */
    public HandlerContainer(String str, Codec<T> codec) {
        this.name = str;
        this.codec = codec;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setTransport(Transport transport) {
        this.transport = transport;
    }

    public AutoCloseable registerHandler(RemoteIdentifier remoteIdentifier, Class<? extends T> cls, EventHandler<? extends T> eventHandler) {
        Tuple2<RemoteIdentifier, Class<? extends T>> tuple2 = new Tuple2<>(remoteIdentifier, cls);
        if (this.tupleToHandlerMap.putIfAbsent(tuple2, eventHandler) != null) {
            this.tupleToHandlerMap.replace(tuple2, eventHandler);
        }
        LOG.log(Level.FINER, "{0}", tuple2);
        return new Subscription(tuple2, this);
    }

    public AutoCloseable registerHandler(Class<? extends T> cls, EventHandler<RemoteMessage<? extends T>> eventHandler) {
        if (this.msgTypeToHandlerMap.putIfAbsent(cls, eventHandler) != null) {
            this.msgTypeToHandlerMap.replace(cls, eventHandler);
        }
        LOG.log(Level.FINER, "{0}", cls);
        return new Subscription(cls, this);
    }

    public AutoCloseable registerErrorHandler(EventHandler<Exception> eventHandler) {
        this.transport.registerErrorHandler(eventHandler);
        return new Subscription(new Exception(), this);
    }

    public void unsubscribe(Subscription<T> subscription) {
        T token = subscription.getToken();
        LOG.log(Level.FINER, "RemoteManager: {0} token {1}", new Object[]{this.name, token});
        if (token instanceof Exception) {
            this.transport.registerErrorHandler(null);
        } else if (token instanceof Tuple2) {
            this.tupleToHandlerMap.remove(token);
        } else {
            if (!(token instanceof Class)) {
                throw new RemoteRuntimeException("Unknown subscription type: " + subscription.getClass().getName());
            }
            this.msgTypeToHandlerMap.remove(token);
        }
    }

    @Override // org.apache.reef.wake.EventHandler
    public synchronized void onNext(RemoteEvent<byte[]> remoteEvent) {
        LOG.log(Level.FINER, "RemoteManager: {0} value: {1}", new Object[]{this.name, remoteEvent});
        T decode = this.codec.decode(remoteEvent.getEvent());
        Class<?> cls = decode.getClass();
        SocketRemoteIdentifier socketRemoteIdentifier = new SocketRemoteIdentifier((InetSocketAddress) remoteEvent.remoteAddress());
        Tuple2 tuple2 = new Tuple2(socketRemoteIdentifier, cls);
        EventHandler<? extends T> eventHandler = this.tupleToHandlerMap.get(tuple2);
        if (eventHandler != null) {
            LOG.log(Level.FINER, "Tuple handler: {0}", tuple2);
            eventHandler.onNext(decode);
            return;
        }
        EventHandler<RemoteMessage<? extends T>> eventHandler2 = this.msgTypeToHandlerMap.get(cls);
        if (eventHandler2 != null) {
            LOG.log(Level.FINER, "Message handler: {0}", cls);
            eventHandler2.onNext(new DefaultRemoteMessage(socketRemoteIdentifier, decode));
        } else {
            RemoteRuntimeException remoteRuntimeException = new RemoteRuntimeException("Unknown message type in dispatch: " + cls.getName() + " from " + socketRemoteIdentifier);
            LOG.log(Level.WARNING, "Unknown message type in dispatch.", (Throwable) remoteRuntimeException);
            throw remoteRuntimeException;
        }
    }
}
