package org.apache.nemo.runtime.master;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.exception.IllegalMessageException;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageContext;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageListener;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
@DriverSide
/* loaded from: input_file:org/apache/nemo/runtime/master/PipeManagerMaster.class */
public final class PipeManagerMaster {
    private static final Logger LOG = LoggerFactory.getLogger(PipeManagerMaster.class.getName());
    private final Map<Pair<String, Long>, String> runtimeEdgeSrcIndexToExecutor;
    private final Map<Pair<String, Long>, Lock> runtimeEdgeSrcIndexToLock;
    private final Map<Pair<String, Long>, Condition> runtimeEdgeSrcIndexToCondition;
    private final ExecutorService waitForPipe;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nemo.runtime.master.PipeManagerMaster$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/nemo/runtime/master/PipeManagerMaster$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType = new int[ControlMessage.MessageType.values().length];

        static {
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.PipeInit.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.RequestPipeLoc.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* loaded from: input_file:org/apache/nemo/runtime/master/PipeManagerMaster$PipeManagerMasterControlMessageReceiver.class */
    public final class PipeManagerMasterControlMessageReceiver implements MessageListener<ControlMessage.Message> {
        public PipeManagerMasterControlMessageReceiver() {
        }

        public void onMessage(ControlMessage.Message message) {
            switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[message.getType().ordinal()]) {
                case 1:
                    ControlMessage.PipeInitMessage pipeInitMsg = message.getPipeInitMsg();
                    Pair of = Pair.of(pipeInitMsg.getRuntimeEdgeId(), Long.valueOf(pipeInitMsg.getSrcTaskIndex()));
                    Lock lock = (Lock) PipeManagerMaster.this.runtimeEdgeSrcIndexToLock.get(of);
                    lock.lock();
                    try {
                        if (null != PipeManagerMaster.this.runtimeEdgeSrcIndexToExecutor.put(of, pipeInitMsg.getExecutorId())) {
                            throw new RuntimeException(of.toString());
                        }
                        ((Condition) PipeManagerMaster.this.runtimeEdgeSrcIndexToCondition.get(of)).signalAll();
                        lock.unlock();
                        return;
                    } catch (Throwable th) {
                        lock.unlock();
                        throw th;
                    }
                default:
                    throw new IllegalMessageException(new Exception(message.toString()));
            }
        }

        public void onMessageWithContext(ControlMessage.Message message, MessageContext messageContext) {
            switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[message.getType().ordinal()]) {
                case 2:
                    ControlMessage.RequestPipeLocationMessage requestPipeLocMsg = message.getRequestPipeLocMsg();
                    PipeManagerMaster.this.waitForPipe.submit(() -> {
                        Pair of = Pair.of(requestPipeLocMsg.getRuntimeEdgeId(), Long.valueOf(requestPipeLocMsg.getSrcTaskIndex()));
                        Lock lock = (Lock) PipeManagerMaster.this.runtimeEdgeSrcIndexToLock.get(of);
                        lock.lock();
                        try {
                            try {
                                if (!PipeManagerMaster.this.runtimeEdgeSrcIndexToExecutor.containsKey(of)) {
                                    ((Condition) PipeManagerMaster.this.runtimeEdgeSrcIndexToCondition.get(of)).await();
                                }
                                String str = (String) PipeManagerMaster.this.runtimeEdgeSrcIndexToExecutor.get(of);
                                if (str == null) {
                                    throw new IllegalStateException(of.toString());
                                }
                                messageContext.reply(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("EXECUTOR_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.PipeLocInfo).setPipeLocInfoMsg(ControlMessage.PipeLocationInfoMessage.newBuilder().setRequestId(message.getId()).setExecutorId(str).build()).build());
                                lock.unlock();
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new RuntimeException(e);
                            }
                        } catch (Throwable th) {
                            lock.unlock();
                            throw th;
                        }
                    });
                    return;
                default:
                    throw new IllegalMessageException(new Exception(message.toString()));
            }
        }
    }

    @Inject
    private PipeManagerMaster(MessageEnvironment messageEnvironment) {
        messageEnvironment.setupListener("PIPE_MANAGER_MASTER_MESSAGE_LISTENER_ID", new PipeManagerMasterControlMessageReceiver());
        this.runtimeEdgeSrcIndexToExecutor = new ConcurrentHashMap();
        this.runtimeEdgeSrcIndexToLock = new ConcurrentHashMap();
        this.runtimeEdgeSrcIndexToCondition = new ConcurrentHashMap();
        this.waitForPipe = Executors.newCachedThreadPool();
    }

    public void onTaskScheduled(String str, long j) {
        Pair<String, Long> of = Pair.of(str, Long.valueOf(j));
        if (null != this.runtimeEdgeSrcIndexToLock.put(of, new ReentrantLock())) {
            throw new IllegalStateException(of.toString());
        }
        if (null != this.runtimeEdgeSrcIndexToCondition.put(of, this.runtimeEdgeSrcIndexToLock.get(of).newCondition())) {
            throw new IllegalStateException(of.toString());
        }
    }
}
