package org.apache.nemo.runtime.master.scheduler;

import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import javax.annotation.concurrent.NotThreadSafe;
import javax.inject.Inject;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.nemo.common.Pair;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
import org.apache.nemo.common.exception.UnrecoverableFailureException;
import org.apache.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.PlanRewriter;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.state.StageState;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.BlockManagerMaster;
import org.apache.nemo.runtime.master.PlanAppender;
import org.apache.nemo.runtime.master.PlanStateManager;
import org.apache.nemo.runtime.master.resource.ExecutorRepresenter;
import org.apache.nemo.runtime.master.scheduler.ExecutorRegistry;
import org.apache.reef.annotations.audience.DriverSide;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DriverSide
@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/BatchScheduler.class */
public final class BatchScheduler implements Scheduler {
    private static final Logger LOG = LoggerFactory.getLogger(BatchScheduler.class.getName());
    private final PlanRewriter planRewriter;
    private final TaskDispatcher taskDispatcher;
    private final PendingTaskCollectionPointer pendingTaskCollectionPointer;
    private final ExecutorRegistry executorRegistry;
    private final PlanStateManager planStateManager;
    private final BlockManagerMaster blockManagerMaster;
    private List<List<Stage>> sortedScheduleGroups;

    /* renamed from: org.apache.nemo.runtime.master.scheduler.BatchScheduler$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/nemo/runtime/master/scheduler/BatchScheduler$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State = new int[TaskState.State.values().length];

        static {
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.COMPLETE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.SHOULD_RETRY.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.ON_HOLD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.FAILED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.READY.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.EXECUTING.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    @Inject
    private BatchScheduler(PlanRewriter planRewriter, TaskDispatcher taskDispatcher, PendingTaskCollectionPointer pendingTaskCollectionPointer, BlockManagerMaster blockManagerMaster, ExecutorRegistry executorRegistry, PlanStateManager planStateManager) {
        this.planRewriter = planRewriter;
        this.taskDispatcher = taskDispatcher;
        this.pendingTaskCollectionPointer = pendingTaskCollectionPointer;
        this.blockManagerMaster = blockManagerMaster;
        this.executorRegistry = executorRegistry;
        this.planStateManager = planStateManager;
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void updatePlan(PhysicalPlan physicalPlan) {
        updatePlan(physicalPlan, this.planStateManager.getMaxScheduleAttempt());
    }

    private void updatePlan(PhysicalPlan physicalPlan, int i) {
        this.planStateManager.updatePlan(physicalPlan, i);
        this.sortedScheduleGroups = (List) ((Map) physicalPlan.getStageDAG().getVertices().stream().collect(Collectors.groupingBy((v0) -> {
            return v0.getScheduleGroup();
        }))).entrySet().stream().sorted(Map.Entry.comparingByKey()).map((v0) -> {
            return v0.getValue();
        }).collect(Collectors.toList());
    }

    public void onRunTimePassMessage(String str, Object obj) {
        BatchSchedulerUtils.onRunTimePassMessage(this.planStateManager, this.planRewriter, str, obj);
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void schedulePlan(PhysicalPlan physicalPlan, int i) {
        LOG.info("Plan to schedule: {}", physicalPlan.getPlanId());
        if (this.planStateManager.isInitialized()) {
            updatePlan(PlanAppender.appendPlan(this.planStateManager.getPhysicalPlan(), physicalPlan), i);
            this.planStateManager.storeJSON("appended");
        } else {
            this.taskDispatcher.run();
            updatePlan(physicalPlan, i);
            this.planStateManager.storeJSON("submitted");
        }
        doSchedule();
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onTaskStateReportFromExecutor(String str, String str2, int i, TaskState.State state, @Nullable String str3, TaskState.RecoverableTaskFailureCause recoverableTaskFailureCause) {
        this.planStateManager.onTaskStateChanged(str2, state);
        switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[state.ordinal()]) {
            case 1:
                BatchSchedulerUtils.onTaskExecutionComplete(this.executorRegistry, str, str2);
                break;
            case 2:
                BatchSchedulerUtils.onTaskExecutionFailedRecoverable(this.planStateManager, this.blockManagerMaster, this.executorRegistry, str, str2, recoverableTaskFailureCause);
                break;
            case 3:
                BatchSchedulerUtils.onTaskExecutionOnHold(this.planStateManager, this.executorRegistry, this.planRewriter, str, str2).ifPresent(this::updatePlan);
                break;
            case 4:
                throw new UnrecoverableFailureException(new Exception(String.format("The plan failed on %s in %s", str2, str)));
            case 5:
            case 6:
                throw new RuntimeException("The states READY/EXECUTING cannot occur at this point");
            default:
                throw new UnknownExecutionStateException(new Exception("This TaskState is unknown: " + state));
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[state.ordinal()]) {
            case 1:
            case 3:
                if (this.planStateManager.getStageState(RuntimeIdManager.getStageIdFromTaskId(str2)).equals(StageState.State.COMPLETE) && !this.planStateManager.isPlanDone()) {
                    doSchedule();
                    break;
                }
                break;
            case 2:
                doSchedule();
                break;
        }
        switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[state.ordinal()]) {
            case 1:
            case 2:
            case 3:
                this.taskDispatcher.onExecutorSlotAvailable();
                return;
            default:
                return;
        }
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onSpeculativeExecutionCheck() {
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        BatchSchedulerUtils.selectEarliestSchedulableGroup(this.sortedScheduleGroups, this.planStateManager).ifPresent(list -> {
            list.stream().map((v0) -> {
                return v0.getId();
            }).forEach(str -> {
                Stage vertexById = this.planStateManager.getPhysicalPlan().getStageDAG().getVertexById(str);
                vertexById.getPropertyValue(ClonedSchedulingProperty.class).ifPresent(cloneConf -> {
                    if (cloneConf.isUpFrontCloning()) {
                        return;
                    }
                    mutableBoolean.setValue(doSpeculativeExecution(vertexById, cloneConf));
                });
            });
        });
        if (mutableBoolean.booleanValue()) {
            doSchedule();
        }
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onExecutorAdded(ExecutorRepresenter executorRepresenter) {
        LOG.info("{} added (node: {})", executorRepresenter.getExecutorId(), executorRepresenter.getNodeName());
        this.executorRegistry.registerExecutor(executorRepresenter);
        this.taskDispatcher.onExecutorSlotAvailable();
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void onExecutorRemoved(String str) {
        LOG.info("{} removed", str);
        this.blockManagerMaster.removeWorker(str);
        HashSet hashSet = new HashSet();
        this.executorRegistry.updateExecutor(str, (executorRepresenter, executorState) -> {
            hashSet.addAll(executorRepresenter.onExecutorFailed());
            return Pair.of(executorRepresenter, ExecutorRegistry.ExecutorState.FAILED);
        });
        BlockManagerMaster blockManagerMaster = this.blockManagerMaster;
        Objects.requireNonNull(blockManagerMaster);
        hashSet.forEach(blockManagerMaster::onProducerTaskFailed);
        BatchSchedulerUtils.retryTasksAndRequiredParents(this.planStateManager, this.blockManagerMaster, hashSet);
        doSchedule();
    }

    @Override // org.apache.nemo.runtime.master.scheduler.Scheduler
    public void terminate() {
        this.taskDispatcher.terminate();
        this.executorRegistry.terminate();
    }

    private void doSchedule() {
        Optional<List<Stage>> selectEarliestSchedulableGroup = BatchSchedulerUtils.selectEarliestSchedulableGroup(this.sortedScheduleGroups, this.planStateManager);
        if (!selectEarliestSchedulableGroup.isPresent()) {
            LOG.info("Skipping this round as no ScheduleGroup is schedulable.");
            return;
        }
        List list = (List) selectEarliestSchedulableGroup.get().stream().flatMap(stage -> {
            return BatchSchedulerUtils.selectSchedulableTasks(this.planStateManager, this.blockManagerMaster, stage).stream();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        LOG.info("Scheduling some tasks in {}, which are in the same ScheduleGroup", list.stream().map((v0) -> {
            return v0.getTaskId();
        }).map(RuntimeIdManager::getStageIdFromTaskId).collect(Collectors.toSet()));
        this.pendingTaskCollectionPointer.setToOverwrite(list);
        this.taskDispatcher.onNewPendingTaskCollectionAvailable();
    }

    private boolean doSpeculativeExecution(Stage stage, ClonedSchedulingProperty.CloneConf cloneConf) {
        double fractionToWaitFor = cloneConf.getFractionToWaitFor();
        Object[] array = this.planStateManager.getCompletedTaskTimeListMs(stage.getId()).toArray();
        if (array.length <= 0 || array.length < Math.round(stage.getTaskIndices().size() * fractionToWaitFor)) {
            return false;
        }
        Arrays.sort(array);
        return modifyStageNumCloneUsingMedianTime(stage.getId(), array.length, ((Long) array[array.length / 2]).longValue(), cloneConf.getMedianTimeMultiplier(), this.planStateManager.getExecutingTaskToRunningTimeMs(stage.getId()));
    }

    private boolean modifyStageNumCloneUsingMedianTime(String str, long j, long j2, double d, Map<String, Long> map) {
        for (Map.Entry<String, Long> entry : map.entrySet()) {
            long longValue = entry.getValue().longValue();
            if (longValue > Math.round(j2 * d)) {
                String key = entry.getKey();
                if (this.planStateManager.setNumOfClones(str, RuntimeIdManager.getIndexFromTaskId(key), 2)) {
                    LOG.info("Cloned {}, because its running time {} (ms) is bigger than {} tasks' (median) {} (ms) * (multiplier) {}", new Object[]{key, Long.valueOf(longValue), Long.valueOf(j), Long.valueOf(j2), Double.valueOf(d)});
                    return true;
                }
            }
        }
        return false;
    }
}
