package org.apache.nemo.runtime.common.message.local;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.inject.Inject;
import org.apache.nemo.runtime.common.message.MessageListener;
import org.apache.nemo.runtime.common.message.MessageSender;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Injector;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.exceptions.InjectionException;

/* loaded from: input_file:org/apache/nemo/runtime/common/message/local/LocalMessageDispatcher.class */
public final class LocalMessageDispatcher {
    private static final Tang TANG = Tang.Factory.getTang();
    private final ConcurrentMap<String, ConcurrentMap<String, MessageListener>> nodeIdToMessageListenersMap = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nemo/runtime/common/message/local/LocalMessageDispatcher$LocalDispatcherException.class */
    public final class LocalDispatcherException extends RuntimeException {
        LocalDispatcherException(String str) {
            super(str);
        }
    }

    @Inject
    private LocalMessageDispatcher() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> MessageSender<T> setupListener(String str, String str2, MessageListener<T> messageListener) {
        ConcurrentMap<String, MessageListener> concurrentMap = this.nodeIdToMessageListenersMap.get(str);
        if (concurrentMap == null) {
            concurrentMap = new ConcurrentHashMap();
            ConcurrentMap<String, MessageListener> putIfAbsent = this.nodeIdToMessageListenersMap.putIfAbsent(str, concurrentMap);
            if (putIfAbsent != null) {
                concurrentMap = putIfAbsent;
            }
        }
        if (concurrentMap.putIfAbsent(str2, messageListener) != null) {
            throw new LocalDispatcherException(str2 + " was already used in " + str);
        }
        return new LocalMessageSender(str, str, str2, this);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeListener(String str, String str2) {
        this.nodeIdToMessageListenersMap.get(str).remove(str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T> void dispatchSendMessage(String str, String str2, T t) {
        MessageListener messageListener = this.nodeIdToMessageListenersMap.get(str).get(str2);
        if (messageListener == null) {
            throw new LocalDispatcherException("There was no set up listener for " + str2 + " in " + str);
        }
        messageListener.onMessage(t);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public <T, U> CompletableFuture<U> dispatchRequestMessage(String str, String str2, String str3, T t) {
        MessageListener messageListener = this.nodeIdToMessageListenersMap.get(str2).get(str3);
        if (messageListener == null) {
            throw new LocalDispatcherException("There was no set up listener for " + str3 + " in " + str2);
        }
        LocalMessageContext localMessageContext = new LocalMessageContext(str);
        messageListener.onMessageWithContext(t, localMessageContext);
        return CompletableFuture.completedFuture(localMessageContext.getReplyMessage().orElse(null));
    }

    public static Injector getInjector() throws InjectionException {
        return forkInjector(TANG.newInjector());
    }

    public static Injector forkInjector(Injector injector) throws InjectionException {
        Injector forkInjector = injector.forkInjector(new Configuration[]{LocalMessageEnvironment.LOCAL_MESSAGE_ENVIRONMENT_CONFIGURATION});
        forkInjector.getInstance(LocalMessageDispatcher.class);
        return forkInjector;
    }
}
