package org.apache.nemo.runtime.master.scheduler;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.PipeManagerMaster;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DriverSide
@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/StreamingScheduler.class */
public final class StreamingScheduler implements Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingScheduler.class.getName());
    private final TaskDispatcher taskDispatcher;
    private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
    private final ExecutorRegistry executorRegistry;
    private final PlanStateManager planStateManager;
    private final PipeManagerMaster pipeManagerMaster;

    /* renamed from: org.apache.nemo.runtime.master.scheduler.StreamingScheduler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/StreamingScheduler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State = new int[TaskState.State.values().length];

        static {
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.ON_HOLD.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.FAILED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.SHOULD_RETRY.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.READY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.EXECUTING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    @Inject
    StreamingScheduler(TaskDispatcher taskDispatcher, PendingTaskCollectionPointer pendingTaskCollectionPointer, ExecutorRegistry executorRegistry, PlanStateManager planStateManager, PipeManagerMaster pipeManagerMaster) {
        this.taskDispatcher = taskDispatcher;
        this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
        this.executorRegistry = executorRegistry;
        this.planStateManager = planStateManager;
        this.pipeManagerMaster = pipeManagerMaster;
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void schedulePlan(PhysicalPlan physicalPlan, int i) {
        this.taskDispatcher.run();
        this.planStateManager.updatePlan(physicalPlan, i);
        this.planStateManager.storeJSON("submitted");
        this.pendingTaskCollectionPointer.setToOverwrite((List) physicalPlan.getStageDAG().getTopologicalSort().stream().flatMap(stage -> {
            List incomingEdgesOf = physicalPlan.getStageDAG().getIncomingEdgesOf(stage.getId());
            List outgoingEdgesOf = physicalPlan.getStageDAG().getOutgoingEdgesOf(stage.getId());
            List vertexIdToReadables = stage.getVertexIdToReadables();
            List<String> taskAttemptsToSchedule = this.planStateManager.getTaskAttemptsToSchedule(stage.getId());
            taskAttemptsToSchedule.forEach(str -> {
                int indexFromTaskId = RuntimeIdManager.getIndexFromTaskId(str);
                outgoingEdgesOf.forEach(stageEdge -> {
                    this.pipeManagerMaster.onTaskScheduled(stageEdge.getId(), indexFromTaskId);
                });
            });
            return taskAttemptsToSchedule.stream().map(str2 -> {
                return new Task(physicalPlan.getPlanId(), str2, stage.getExecutionProperties(), stage.getSerializedIRDAG(), incomingEdgesOf, outgoingEdgesOf, (Map) vertexIdToReadables.get(RuntimeIdManager.getIndexFromTaskId(str2)));
            });
        }).collect(Collectors.toList()));
        this.taskDispatcher.onNewPendingTaskCollectionAvailable();
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void updatePlan(PhysicalPlan physicalPlan) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onTaskStateReportFromExecutor(String str, String str2, int i, TaskState.State state, @Nullable String str3, TaskState.RecoverableTaskFailureCause recoverableTaskFailureCause) {
        this.planStateManager.onTaskStateChanged(str2, state);
        switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[state.ordinal()]) {
            case 1:
                return;
            case 2:
            case 3:
            case 4:
                throw new UnsupportedOperationException();
            case 5:
            case 6:
                throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
            default:
                throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
        }
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onSpeculativeExecutionCheck() {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onExecutorAdded(ExecutorRepresenter executorRepresenter) {
        LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
        this.taskDispatcher.onExecutorSlotAvailable();
        this.executorRegistry.registerExecutor(executorRepresenter);
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onExecutorRemoved(String str) {
        throw new UnsupportedOperationException();
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void terminate() {
        this.taskDispatcher.terminate();
        this.executorRegistry.terminate();
    }
}
