package org.apache.nemo.runtime.common.message;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import javax.inject.Inject;
import org.apache.nemo.common.exception.NodeConnectionException;
import org.apache.nemo.runtime.common.comm.ControlMessage;

/* loaded from: input_file:org/apache/nemo/runtime/common/message/PersistentConnectionToMasterMap.class */
public final class PersistentConnectionToMasterMap {
    private final Map<String, MessageSender<ControlMessage.Message>> messageSenders = new HashMap();
    private final MessageEnvironment messageEnvironment;

    @Inject
    private PersistentConnectionToMasterMap(MessageEnvironment messageEnvironment) {
        this.messageEnvironment = messageEnvironment;
        try {
            this.messageSenders.put(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID, (MessageSender) messageEnvironment.asyncConnect(MessageEnvironment.MASTER_COMMUNICATION_ID, MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).get());
            this.messageSenders.put(MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID, (MessageSender) messageEnvironment.asyncConnect(MessageEnvironment.MASTER_COMMUNICATION_ID, MessageEnvironment.BLOCK_MANAGER_MASTER_MESSAGE_LISTENER_ID).get());
        } catch (InterruptedException | ExecutionException e) {
            Thread.currentThread().interrupt();
            throw new NodeConnectionException(e);
        }
    }

    public synchronized MessageSender<ControlMessage.Message> getMessageSender(String str) {
        MessageSender<ControlMessage.Message> messageSender = this.messageSenders.get(str);
        if (messageSender != null) {
            return messageSender;
        }
        try {
            MessageSender<ControlMessage.Message> messageSender2 = (MessageSender) this.messageEnvironment.asyncConnect(MessageEnvironment.MASTER_COMMUNICATION_ID, str).get();
            this.messageSenders.put(str, messageSender2);
            return messageSender2;
        } catch (InterruptedException | ExecutionException e) {
            Thread.currentThread().interrupt();
            throw new NodeConnectionException(e);
        }
    }
}
