package org.apache.nemo.runtime.master;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.protobuf.ByteString;
import java.io.Serializable;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.inject.Inject;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.Util;
import org.apache.nemo.common.exception.ContainerException;
import org.apache.nemo.common.exception.IllegalMessageException;
import org.apache.nemo.common.exception.MetricException;
import org.apache.nemo.common.ir.IRDAG;
import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.IRVertex;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.ClientRPC;
import org.apache.nemo.runtime.common.message.MessageContext;
import org.apache.nemo.runtime.common.message.MessageEnvironment;
import org.apache.nemo.runtime.common.message.MessageListener;
import org.apache.nemo.runtime.common.message.MessageUtils;
import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.master.metric.MetricManagerMaster;
import org.apache.nemo.runtime.master.metric.MetricMessageHandler;
import org.apache.nemo.runtime.master.metric.MetricStore;
import org.apache.nemo.runtime.master.resource.ContainerManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.nemo.runtime.master.scheduler.BatchScheduler;
import org.apache.nemo.runtime.master.scheduler.Scheduler;
import org.apache.nemo.runtime.master.servlet.AllMetricServlet;
import org.apache.nemo.runtime.master.servlet.JobMetricServlet;
import org.apache.nemo.runtime.master.servlet.StageMetricServlet;
import org.apache.nemo.runtime.master.servlet.TaskMetricServlet;
import org.apache.nemo.runtime.master.servlet.WebSocketMetricServlet;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Parameter;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DriverSide
/* loaded from: input_file:org/apache/nemo/runtime/master/RuntimeMaster.class */
public final class RuntimeMaster {
    private static final Logger LOG = LoggerFactory.getLogger(RuntimeMaster.class.getName());
    private static final int DAG_LOGGING_PERIOD = 3000;
    private static final int METRIC_ARRIVE_TIMEOUT = 10000;
    private static final int REST_SERVER_PORT = 10101;
    private static final int SPECULATION_CHECKING_PERIOD_MS = 100;
    private final ExecutorService runtimeMasterThread = Executors.newSingleThreadExecutor(runnable -> {
        return new Thread(runnable, "RuntimeMaster thread");
    });
    private final ScheduledExecutorService speculativeTaskCloningThread = Executors.newSingleThreadScheduledExecutor(runnable -> {
        return new Thread(runnable, "SpeculativeTaskCloning thread");
    });
    private final Scheduler scheduler;
    private final ContainerManager containerManager;
    private final MetricMessageHandler metricMessageHandler;
    private final MessageEnvironment masterMessageEnvironment;
    private final ClientRPC clientRPC;
    private final MetricManagerMaster metricManagerMaster;
    private final PlanStateManager planStateManager;
    private final ObjectMapper objectMapper;
    private final String jobId;
    private final String dagDirectory;
    private final Boolean dbEnabled;
    private final String dbAddress;
    private final String dbId;
    private final String dbPassword;
    private final Set<IRVertex> irVertices;
    private final AtomicInteger resourceRequestCount;
    private CountDownLatch metricCountDownLatch;
    private final Server metricServer;
    private final MetricStore metricStore;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nemo.runtime.master.RuntimeMaster$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/nemo/runtime/master/RuntimeMaster$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType = new int[ControlMessage.MessageType.values().length];

        static {
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.RequestBroadcastVariable.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.TaskStateChanged.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.ExecutorFailed.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.RunTimePassMessage.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.MetricMessageReceived.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.ExecutorDataCollected.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[ControlMessage.MessageType.MetricFlushed.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    /* loaded from: input_file:org/apache/nemo/runtime/master/RuntimeMaster$MasterControlMessageReceiver.class */
    public final class MasterControlMessageReceiver implements MessageListener<ControlMessage.Message> {
        public MasterControlMessageReceiver() {
        }

        public void onMessage(ControlMessage.Message message) {
            RuntimeMaster.this.runtimeMasterThread.execute(() -> {
                RuntimeMaster.this.handleControlMessage(message);
            });
        }

        public void onMessageWithContext(ControlMessage.Message message, MessageContext messageContext) {
            switch (AnonymousClass2.$SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[message.getType().ordinal()]) {
                case 1:
                    Serializable serializable = (Serializable) SerializationUtils.deserialize(message.getRequestbroadcastVariableMsg().getBroadcastId().toByteArray());
                    Object broadcastVariable = BroadcastManagerMaster.getBroadcastVariable(serializable);
                    if (broadcastVariable == null) {
                        throw new IllegalStateException(serializable.toString());
                    }
                    messageContext.reply(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("RUNTIME_MASTER_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.InMasterBroadcastVariable).setBroadcastVariableMsg(ControlMessage.InMasterBroadcastVariableMessage.newBuilder().setRequestId(message.getId()).setVariable(ByteString.copyFrom(SerializationUtils.serialize((Serializable) broadcastVariable))).build()).build());
                    return;
                default:
                    throw new IllegalMessageException(new Exception("This message should not be requested to Master :" + message.getType()));
            }
        }
    }

    @Inject
    private RuntimeMaster(Scheduler scheduler, ContainerManager containerManager, MetricMessageHandler metricMessageHandler, MessageEnvironment messageEnvironment, MetricManagerMaster metricManagerMaster, ClientRPC clientRPC, PlanStateManager planStateManager, @Parameter(JobConf.JobId.class) String str, @Parameter(JobConf.DBEnabled.class) Boolean bool, @Parameter(JobConf.DBAddress.class) String str2, @Parameter(JobConf.DBId.class) String str3, @Parameter(JobConf.DBPasswd.class) String str4, @Parameter(JobConf.DAGDirectory.class) String str5) {
        this.speculativeTaskCloningThread.scheduleAtFixedRate(() -> {
            ExecutorService executorService = this.runtimeMasterThread;
            Objects.requireNonNull(scheduler);
            executorService.submit(scheduler::onSpeculativeExecutionCheck);
        }, 100L, 100L, TimeUnit.MILLISECONDS);
        this.scheduler = scheduler;
        this.containerManager = containerManager;
        this.metricMessageHandler = metricMessageHandler;
        this.masterMessageEnvironment = messageEnvironment;
        this.masterMessageEnvironment.setupListener("RUNTIME_MASTER_MESSAGE_LISTENER_ID", new MasterControlMessageReceiver());
        this.clientRPC = clientRPC;
        this.metricManagerMaster = metricManagerMaster;
        this.jobId = str;
        this.dagDirectory = str5;
        this.dbEnabled = bool;
        this.dbAddress = str2;
        this.dbId = str3;
        this.dbPassword = str4;
        this.irVertices = new HashSet();
        this.resourceRequestCount = new AtomicInteger(0);
        this.objectMapper = new ObjectMapper();
        this.metricServer = startRestMetricServer();
        this.metricStore = MetricStore.getStore();
        this.planStateManager = planStateManager;
    }

    private Server startRestMetricServer() {
        Server server = new Server(REST_SERVER_PORT);
        ServletHandler servletHandler = new ServletHandler();
        server.setHandler(servletHandler);
        servletHandler.addServletWithMapping(JobMetricServlet.class, "/api/job");
        servletHandler.addServletWithMapping(TaskMetricServlet.class, "/api/task");
        servletHandler.addServletWithMapping(StageMetricServlet.class, "/api/stage");
        servletHandler.addServletWithMapping(AllMetricServlet.class, "/api");
        servletHandler.addServletWithMapping(WebSocketMetricServlet.class, "/api/websocket");
        try {
            server.start();
            return server;
        } catch (Exception e) {
            throw new MetricException("Failed to start REST API server: " + e);
        }
    }

    public void recordIRDAGMetrics(IRDAG irdag, String str) {
        this.metricStore.getOrCreateMetric(JobMetric.class, str).setIRDAG(irdag);
    }

    public void flushMetrics() {
        this.metricManagerMaster.sendMetricFlushRequest();
        this.metricStore.dumpAllMetricToFile(Paths.get(this.dagDirectory, "Metric_" + this.jobId + "_" + System.currentTimeMillis() + ".json").toString());
        if (this.dbEnabled.booleanValue()) {
            this.metricStore.saveOptimizationMetricsToDB(this.dbAddress, this.jobId, this.dbId, this.dbPassword);
        }
    }

    public Pair<PlanStateManager, ScheduledExecutorService> execute(PhysicalPlan physicalPlan, int i) {
        try {
            return (Pair) this.runtimeMasterThread.submit(() -> {
                this.irVertices.addAll(physicalPlan.getIdToIRVertex().values());
                try {
                    this.scheduler.schedulePlan(physicalPlan, i);
                    return Pair.of(this.planStateManager, scheduleDagLogging());
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }).get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public void terminate() {
        this.speculativeTaskCloningThread.shutdown();
        try {
            if (!this.metricCountDownLatch.await(10000L, TimeUnit.MILLISECONDS)) {
                LOG.warn("Terminating master before all executor terminated messages arrived.");
            }
        } catch (InterruptedException e) {
            LOG.warn("Waiting executor terminating process interrupted: ", e);
            Thread.currentThread().interrupt();
        }
        this.runtimeMasterThread.execute(() -> {
            this.scheduler.terminate();
            try {
                this.masterMessageEnvironment.close();
                this.metricMessageHandler.terminate();
                this.containerManager.terminate();
                try {
                    this.metricServer.stop();
                } catch (Exception e2) {
                    throw new MetricException("Failed to stop rest api server: " + e2);
                }
            } catch (Exception e3) {
                throw new RuntimeException(e3);
            }
        });
    }

    public void requestContainer(String str) {
        try {
            this.runtimeMasterThread.submit(() -> {
                try {
                    for (Pair pair : Util.parseResourceSpecificationString(str)) {
                        this.resourceRequestCount.getAndAdd(((Integer) pair.left()).intValue());
                        this.containerManager.requestContainer(((Integer) pair.left()).intValue(), (ResourceSpecification) pair.right());
                    }
                    this.metricCountDownLatch = new CountDownLatch(this.resourceRequestCount.get());
                } catch (Exception e) {
                    throw new ContainerException(e);
                }
            }).get();
        } catch (Exception e) {
            LOG.error("Exception while requesting for a container: ", e);
            throw new ContainerException(e);
        }
    }

    public void onContainerAllocated(String str, AllocatedEvaluator allocatedEvaluator, Configuration configuration) {
        this.runtimeMasterThread.execute(() -> {
            this.containerManager.onContainerAllocated(str, allocatedEvaluator, configuration);
        });
    }

    public boolean onExecutorLaunched(ActiveContext activeContext) {
        try {
            return ((Boolean) this.runtimeMasterThread.submit(() -> {
                Optional<ExecutorRepresenter> onContainerLaunched = this.containerManager.onContainerLaunched(activeContext);
                if (!onContainerLaunched.isPresent()) {
                    return false;
                }
                this.scheduler.onExecutorAdded(onContainerLaunched.get());
                return Boolean.valueOf(this.resourceRequestCount.decrementAndGet() == 0);
            }).get()).booleanValue();
        } catch (Exception e) {
            throw new ContainerException(e);
        }
    }

    public void onExecutorFailed(FailedEvaluator failedEvaluator) {
        this.runtimeMasterThread.execute(() -> {
            this.metricCountDownLatch.countDown();
            failedEvaluator.getFailedContextList().forEach(failedContext -> {
                this.scheduler.onExecutorRemoved(failedContext.getId());
            });
            this.containerManager.onContainerFailed(failedEvaluator.getId());
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleControlMessage(ControlMessage.Message message) {
        switch (AnonymousClass2.$SwitchMap$org$apache$nemo$runtime$common$comm$ControlMessage$MessageType[message.getType().ordinal()]) {
            case 2:
                ControlMessage.TaskStateChangedMsg taskStateChangedMsg = message.getTaskStateChangedMsg();
                this.scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(), taskStateChangedMsg.getTaskId(), taskStateChangedMsg.getAttemptIdx(), MessageUtils.convertTaskState(taskStateChangedMsg.getState()), taskStateChangedMsg.getVertexPutOnHoldId(), MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
                return;
            case 3:
                ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
                String executorId = executorFailedMsg.getExecutorId();
                Exception exc = (Exception) SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
                LOG.error(executorId + " failed, Stack Trace: ", exc);
                throw new RuntimeException(exc);
            case 4:
                ((BatchScheduler) this.scheduler).onRunTimePassMessage(message.getRunTimePassMessageMsg().getTaskId(), message.getRunTimePassMessageMsg().getEntryList());
                return;
            case 5:
                message.getMetricMsg().getMetricList().forEach(metric -> {
                    this.metricMessageHandler.onMetricMessageReceived(metric.getMetricType(), metric.getMetricId(), metric.getMetricField(), metric.getMetricValue().toByteArray());
                });
                return;
            case 6:
                this.clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder().setType(ControlMessage.DriverToClientMessageType.DataCollected).setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(message.getDataCollected().getData()).build()).build());
                return;
            case 7:
                this.metricCountDownLatch.countDown();
                return;
            default:
                throw new IllegalMessageException(new Exception("This message should not be received by Master :" + message.getType()));
        }
    }

    private ScheduledExecutorService scheduleDagLogging() {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        newSingleThreadScheduledExecutor.scheduleAtFixedRate(new Runnable() { // from class: org.apache.nemo.runtime.master.RuntimeMaster.1
            @Override // java.lang.Runnable
            public void run() {
                RuntimeMaster.this.planStateManager.storeJSON("periodic");
            }
        }, 3000L, 3000L, TimeUnit.MILLISECONDS);
        return newSingleThreadScheduledExecutor;
    }
}
