package org.apache.reef.io.network.group.impl.task;

import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler;
import org.apache.reef.io.network.group.impl.GroupCommunicationMessage;
import org.apache.reef.io.network.group.impl.utils.Utils;
import org.apache.reef.io.network.proto.ReefNetworkGroupCommProtos;
import org.apache.reef.tang.annotations.Name;
import org.apache.reef.wake.EventHandler;

/* loaded from: input_file:org/apache/reef/io/network/group/impl/task/CommGroupNetworkHandlerImpl.class */
public class CommGroupNetworkHandlerImpl implements CommGroupNetworkHandler {
    private static final Logger LOG = Logger.getLogger(CommGroupNetworkHandlerImpl.class.getName());
    private final Map<Class<? extends Name<String>>, EventHandler<GroupCommunicationMessage>> operHandlers = new ConcurrentHashMap();
    private final Map<Class<? extends Name<String>>, BlockingQueue<GroupCommunicationMessage>> topologyNotifications = new ConcurrentHashMap();

    @Inject
    public CommGroupNetworkHandlerImpl() {
    }

    @Override // org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler
    public void register(Class<? extends Name<String>> cls, EventHandler<GroupCommunicationMessage> eventHandler) {
        LOG.entering("CommGroupNetworkHandlerImpl", "register", new Object[]{Utils.simpleName(cls), eventHandler});
        this.operHandlers.put(cls, eventHandler);
        LOG.exiting("CommGroupNetworkHandlerImpl", "register", Arrays.toString(new Object[]{Utils.simpleName(cls), eventHandler}));
    }

    @Override // org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler
    public void addTopologyElement(Class<? extends Name<String>> cls) {
        LOG.entering("CommGroupNetworkHandlerImpl", "addTopologyElement", Utils.simpleName(cls));
        LOG.finest("Creating LBQ for " + cls);
        this.topologyNotifications.put(cls, new LinkedBlockingQueue());
        LOG.exiting("CommGroupNetworkHandlerImpl", "addTopologyElement", Utils.simpleName(cls));
    }

    public void onNext(GroupCommunicationMessage groupCommunicationMessage) {
        LOG.entering("CommGroupNetworkHandlerImpl", "onNext", groupCommunicationMessage);
        Class<? extends Name<String>> cls = Utils.getClass(groupCommunicationMessage.getOperatorname());
        if (groupCommunicationMessage.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyUpdated || groupCommunicationMessage.getType() == ReefNetworkGroupCommProtos.GroupCommMessage.Type.TopologyChanges) {
            this.topologyNotifications.get(cls).add(groupCommunicationMessage);
        } else {
            this.operHandlers.get(cls).onNext(groupCommunicationMessage);
        }
        LOG.exiting("CommGroupNetworkHandlerImpl", "onNext", groupCommunicationMessage);
    }

    @Override // org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler
    public byte[] waitForTopologyChanges(Class<? extends Name<String>> cls) {
        LOG.entering("CommGroupNetworkHandlerImpl", "waitForTopologyChanges", Utils.simpleName(cls));
        try {
            byte[] data = Utils.getData(this.topologyNotifications.get(cls).take());
            LOG.exiting("CommGroupNetworkHandlerImpl", "waitForTopologyChanges", data);
            return data;
        } catch (InterruptedException e) {
            throw new RuntimeException("InterruptedException while waiting for topology update of " + cls.getSimpleName(), e);
        }
    }

    @Override // org.apache.reef.io.network.group.api.task.CommGroupNetworkHandler
    public GroupCommunicationMessage waitForTopologyUpdate(Class<? extends Name<String>> cls) {
        LOG.entering("CommGroupNetworkHandlerImpl", "waitForTopologyUpdate", Utils.simpleName(cls));
        try {
            GroupCommunicationMessage take = this.topologyNotifications.get(cls).take();
            LOG.exiting("CommGroupNetworkHandlerImpl", "waitForTopologyUpdate", take);
            return take;
        } catch (InterruptedException e) {
            throw new RuntimeException("InterruptedException while waiting for topology update of " + cls.getSimpleName(), e);
        }
    }
}
