package org.apache.nemo.runtime.master;

import com.google.common.annotations.VisibleForTesting;
import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;
import org.apache.nemo.common.StateMachine;
import org.apache.nemo.common.exception.IllegalStateTransitionException;
import org.apache.nemo.common.exception.UnknownExecutionStateException;
import org.apache.nemo.common.ir.vertex.executionproperty.ClonedSchedulingProperty;
import org.apache.nemo.conf.JobConf;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.metric.JobMetric;
import org.apache.nemo.runtime.common.metric.StageMetric;
import org.apache.nemo.runtime.common.metric.TaskMetric;
import org.apache.nemo.runtime.common.plan.PhysicalPlan;
import org.apache.nemo.runtime.common.plan.Stage;
import org.apache.nemo.runtime.common.state.PlanState;
import org.apache.nemo.runtime.common.state.StageState;
import org.apache.nemo.runtime.common.state.TaskState;
import org.apache.nemo.runtime.master.metric.MetricStore;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.tang.annotations.Parameter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@DriverSide
@ThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/master/PlanStateManager.class */
public final class PlanStateManager {
    private static final Logger LOG = LoggerFactory.getLogger(PlanStateManager.class.getName());
    private String planId;
    private int maxScheduleAttempt;
    private PhysicalPlan physicalPlan;
    private final String dagDirectory;
    private int dagLogFileIndex = 0;
    private final Map<String, Long> taskIdToStartTimeMs = new HashMap();
    private final Map<String, List<Long>> stageIdToCompletedTaskTimeMsList = new HashMap();
    private final Map<String, Map<Integer, Integer>> stageIdToTaskIndexToNumOfClones = new HashMap();
    private PlanState planState = new PlanState();
    private final Map<String, StageState> stageIdToState = new HashMap();
    private final Map<String, Map<Integer, List<TaskState>>> stageIdToTaskIdxToAttemptStates = new HashMap();
    private final Lock finishLock = new ReentrantLock();
    private final Condition planFinishedCondition = this.finishLock.newCondition();
    private MetricStore metricStore = MetricStore.getStore();
    private boolean initialized = false;

    /* renamed from: org.apache.nemo.runtime.master.PlanStateManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/nemo/runtime/master/PlanStateManager$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State = new int[TaskState.State.values().length];

        static {
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.SHOULD_RETRY.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.COMPLETE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.ON_HOLD.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.READY.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.EXECUTING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[TaskState.State.FAILED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    @Inject
    private PlanStateManager(@Parameter(JobConf.DAGDirectory.class) String str) {
        this.dagDirectory = str;
    }

    public static PlanStateManager newInstance(String str) {
        return new PlanStateManager(str);
    }

    public void setMetricStore(MetricStore metricStore) {
        this.metricStore = metricStore;
    }

    public synchronized void updatePlan(PhysicalPlan physicalPlan, int i) {
        if (this.initialized) {
            LOG.info("Update Plan from {} to {}", this.physicalPlan.getPlanId(), physicalPlan.getPlanId());
        } else {
            this.initialized = true;
        }
        this.planState = new PlanState();
        this.physicalPlan = physicalPlan;
        this.planId = physicalPlan.getPlanId();
        this.maxScheduleAttempt = i;
        this.metricStore.getOrCreateMetric(JobMetric.class, this.planId).setStageDAG(physicalPlan.getStageDAG());
        this.metricStore.triggerBroadcast(JobMetric.class, this.planId);
        initializeStates();
    }

    private void initializeStates() {
        onPlanStateChanged(PlanState.State.EXECUTING);
        this.physicalPlan.getStageDAG().topologicalDo(stage -> {
            this.stageIdToState.putIfAbsent(stage.getId(), new StageState());
            this.stageIdToTaskIdxToAttemptStates.putIfAbsent(stage.getId(), new HashMap());
            stage.getTaskIndices().forEach(num -> {
                this.stageIdToTaskIdxToAttemptStates.get(stage.getId()).putIfAbsent(num, new ArrayList());
            });
        });
    }

    public synchronized List<String> getTaskAttemptsToSchedule(String str) {
        if (getStageState(str).equals(StageState.State.COMPLETE)) {
            return new ArrayList(0);
        }
        ArrayList arrayList = new ArrayList();
        Stage vertexById = this.physicalPlan.getStageDAG().getVertexById(str);
        Iterator it = vertexById.getTaskIndices().iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            List<TaskState> list = this.stageIdToTaskIdxToAttemptStates.get(str).get(Integer.valueOf(intValue));
            if (list.stream().noneMatch(taskState -> {
                return taskState.getStateMachine().getCurrentState().equals(TaskState.State.COMPLETE);
            })) {
                this.stageIdToTaskIndexToNumOfClones.putIfAbsent(str, new HashMap());
                Optional propertyValue = vertexById.getPropertyValue(ClonedSchedulingProperty.class);
                int intValue2 = (propertyValue.isPresent() && ((ClonedSchedulingProperty.CloneConf) propertyValue.get()).isUpFrontCloning()) ? 2 : this.stageIdToTaskIndexToNumOfClones.get(str).getOrDefault(str, 1).intValue();
                long count = list.stream().filter(this::isTaskNotDone).count();
                for (int i = 0; i < intValue2 - count; i++) {
                    list.add(new TaskState());
                }
                if (list.size() > this.maxScheduleAttempt) {
                    throw new RuntimeException(list.size() + " exceeds max attempt " + this.maxScheduleAttempt);
                }
                for (int i2 = 0; i2 < list.size(); i2++) {
                    if (list.get(i2).getStateMachine().getCurrentState().equals(TaskState.State.READY)) {
                        arrayList.add(RuntimeIdManager.generateTaskId(str, intValue, i2));
                    }
                }
            }
        }
        return arrayList;
    }

    public synchronized Set<String> getAllTaskAttemptsOfStage(String str) {
        return getTaskAttemptIdsToItsState(str).keySet();
    }

    public synchronized Map<String, Long> getExecutingTaskToRunningTimeMs(String str) {
        long currentTimeMillis = System.currentTimeMillis();
        HashMap hashMap = new HashMap();
        Map<Integer, List<TaskState>> map = this.stageIdToTaskIdxToAttemptStates.get(str);
        Iterator<Integer> it = map.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            List<TaskState> list = map.get(Integer.valueOf(intValue));
            for (int i = 0; i < list.size(); i++) {
                if (TaskState.State.EXECUTING.equals(list.get(i).getStateMachine().getCurrentState())) {
                    String generateTaskId = RuntimeIdManager.generateTaskId(str, intValue, i);
                    hashMap.put(generateTaskId, Long.valueOf(currentTimeMillis - this.taskIdToStartTimeMs.get(generateTaskId).longValue()));
                }
            }
        }
        return hashMap;
    }

    public synchronized List<Long> getCompletedTaskTimeListMs(String str) {
        return new ArrayList(this.stageIdToCompletedTaskTimeMsList.getOrDefault(str, new ArrayList(0)));
    }

    public synchronized boolean setNumOfClones(String str, int i, int i2) {
        this.stageIdToTaskIndexToNumOfClones.putIfAbsent(str, new HashMap());
        Integer put = this.stageIdToTaskIndexToNumOfClones.get(str).put(Integer.valueOf(i), Integer.valueOf(i2));
        return put == null || put.intValue() != i2;
    }

    public synchronized void onTaskStateChanged(String str, TaskState.State state) {
        StateMachine stateMachine = getTaskStateHelper(str).getStateMachine();
        LOG.debug("Task State Transition: id {}, from {} to {}", new Object[]{str, stateMachine.getCurrentState(), state});
        this.metricStore.getOrCreateMetric(TaskMetric.class, str).addEvent(stateMachine.getCurrentState(), state);
        this.metricStore.triggerBroadcast(TaskMetric.class, str);
        try {
            stateMachine.setState(state);
            String stageIdFromTaskId = RuntimeIdManager.getStageIdFromTaskId(str);
            Map<Integer, List<TaskState>> map = this.stageIdToTaskIdxToAttemptStates.get(stageIdFromTaskId);
            long count = map.values().stream().filter(list -> {
                List list = (List) list.stream().map(taskState -> {
                    return taskState.getStateMachine().getCurrentState();
                }).collect(Collectors.toList());
                return list.stream().anyMatch(state2 -> {
                    return state2.equals(TaskState.State.ON_HOLD);
                }) || list.stream().anyMatch(state3 -> {
                    return state3.equals(TaskState.State.COMPLETE);
                });
            }).count();
            if (state.equals(TaskState.State.COMPLETE)) {
                LOG.info("{} completed: {} Task(s) out of {} are remaining in this stage", new Object[]{str, Long.valueOf(map.size() - count), Integer.valueOf(map.size())});
            }
            if (state.equals(TaskState.State.EXECUTING)) {
                this.taskIdToStartTimeMs.put(str, Long.valueOf(System.currentTimeMillis()));
            } else if (state.equals(TaskState.State.COMPLETE)) {
                this.stageIdToCompletedTaskTimeMsList.putIfAbsent(stageIdFromTaskId, new ArrayList());
                this.stageIdToCompletedTaskTimeMsList.get(stageIdFromTaskId).add(Long.valueOf(System.currentTimeMillis() - this.taskIdToStartTimeMs.get(str).longValue()));
            }
            switch (AnonymousClass1.$SwitchMap$org$apache$nemo$runtime$common$state$TaskState$State[state.ordinal()]) {
                case 1:
                    if (getPeerAttemptsForTheSameTaskIndex(str).stream().anyMatch(state2 -> {
                        return state2.equals(TaskState.State.COMPLETE);
                    })) {
                        return;
                    }
                    onStageStateChanged(stageIdFromTaskId, StageState.State.INCOMPLETE);
                    return;
                case 2:
                case 3:
                    if (count == this.physicalPlan.getStageDAG().getVertexById(stageIdFromTaskId).getTaskIndices().size()) {
                        onStageStateChanged(stageIdFromTaskId, StageState.State.COMPLETE);
                        return;
                    }
                    return;
                case 4:
                case 5:
                case 6:
                    return;
                default:
                    throw new UnknownExecutionStateException(new Throwable("This task state is unknown"));
            }
        } catch (IllegalStateTransitionException e) {
            throw new RuntimeException(str + " - Illegal task state transition ", e);
        }
    }

    private void onStageStateChanged(String str, StageState.State state) {
        StateMachine stateMachine = this.stageIdToState.get(str).getStateMachine();
        this.metricStore.getOrCreateMetric(StageMetric.class, str).addEvent(getStageState(str), state);
        this.metricStore.triggerBroadcast(StageMetric.class, str);
        LOG.debug("Stage State Transition: id {} from {} to {}", new Object[]{str, stateMachine.getCurrentState(), state});
        try {
            stateMachine.setState(state);
            if (!this.stageIdToState.values().stream().allMatch(stageState -> {
                return stageState.getStateMachine().getCurrentState().equals(StageState.State.COMPLETE);
            }) || PlanState.State.COMPLETE.equals(getPlanState())) {
                return;
            }
            onPlanStateChanged(PlanState.State.COMPLETE);
        } catch (IllegalStateTransitionException e) {
            throw new RuntimeException(str + " - Illegal stage state transition ", e);
        }
    }

    private void onPlanStateChanged(PlanState.State state) {
        this.metricStore.getOrCreateMetric(JobMetric.class, this.planId).addEvent(this.planState.getStateMachine().getCurrentState(), state);
        this.metricStore.triggerBroadcast(JobMetric.class, this.planId);
        try {
            this.planState.getStateMachine().setState(state);
            if (state == PlanState.State.EXECUTING) {
                LOG.debug("Executing Plan ID {}...", this.planId);
                return;
            }
            if (state != PlanState.State.COMPLETE && state != PlanState.State.FAILED) {
                throw new RuntimeException("Illegal Plan State Transition");
            }
            LOG.debug("Plan ID {} {}!", this.planId, state);
            this.finishLock.lock();
            try {
                this.planFinishedCondition.signalAll();
            } finally {
                this.finishLock.unlock();
            }
        } catch (IllegalStateTransitionException e) {
            throw new RuntimeException(this.planId + " - Illegal plan state transition ", e);
        }
    }

    public PlanState.State waitUntilFinish() {
        return waitUntilFinish(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
    }

    public PlanState.State waitUntilFinish(long j, TimeUnit timeUnit) {
        this.finishLock.lock();
        try {
            try {
                if (!isPlanDone()) {
                    while (!this.planFinishedCondition.await(j, timeUnit)) {
                        LOG.warn("Timeout during waiting the finish of Plan ID {}", this.planId);
                        Thread.currentThread().interrupt();
                    }
                }
                this.finishLock.unlock();
            } catch (InterruptedException e) {
                LOG.warn("Interrupted while waiting for the finish of Plan ID {}", this.planId);
                Thread.currentThread().interrupt();
                this.finishLock.unlock();
            }
            return getPlanState();
        } catch (Throwable th) {
            this.finishLock.unlock();
            throw th;
        }
    }

    @VisibleForTesting
    public synchronized Map<String, TaskState.State> getAllTaskAttemptIdsToItsState() {
        return (Map) this.physicalPlan.getStageDAG().getVertices().stream().map((v0) -> {
            return v0.getId();
        }).flatMap(str -> {
            return getTaskAttemptIdsToItsState(str).entrySet().stream();
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }

    public synchronized boolean isPlanDone() {
        return getPlanState() == PlanState.State.COMPLETE || getPlanState() == PlanState.State.FAILED;
    }

    public synchronized String getPlanId() {
        return this.planId;
    }

    public synchronized PlanState.State getPlanState() {
        return this.planState.getStateMachine().getCurrentState();
    }

    public synchronized StageState.State getStageState(String str) {
        return this.stageIdToState.get(str).getStateMachine().getCurrentState();
    }

    public synchronized TaskState.State getTaskState(String str) {
        return getTaskStateHelper(str).getStateMachine().getCurrentState();
    }

    private Map<String, TaskState.State> getTaskAttemptIdsToItsState(String str) {
        HashMap hashMap = new HashMap();
        Map<Integer, List<TaskState>> map = this.stageIdToTaskIdxToAttemptStates.get(str);
        Iterator<Integer> it = map.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            List<TaskState> list = map.get(Integer.valueOf(intValue));
            for (int i = 0; i < list.size(); i++) {
                hashMap.put(RuntimeIdManager.generateTaskId(str, intValue, i), list.get(i).getStateMachine().getCurrentState());
            }
        }
        return hashMap;
    }

    private TaskState getTaskStateHelper(String str) {
        return this.stageIdToTaskIdxToAttemptStates.get(RuntimeIdManager.getStageIdFromTaskId(str)).get(Integer.valueOf(RuntimeIdManager.getIndexFromTaskId(str))).get(RuntimeIdManager.getAttemptFromTaskId(str));
    }

    private boolean isTaskNotDone(TaskState taskState) {
        TaskState.State currentState = taskState.getStateMachine().getCurrentState();
        return currentState.equals(TaskState.State.READY) || currentState.equals(TaskState.State.EXECUTING) || currentState.equals(TaskState.State.ON_HOLD);
    }

    private List<TaskState.State> getPeerAttemptsForTheSameTaskIndex(String str) {
        String stageIdFromTaskId = RuntimeIdManager.getStageIdFromTaskId(str);
        int indexFromTaskId = RuntimeIdManager.getIndexFromTaskId(str);
        int attemptFromTaskId = RuntimeIdManager.getAttemptFromTaskId(str);
        ArrayList arrayList = new ArrayList(this.stageIdToTaskIdxToAttemptStates.get(stageIdFromTaskId).get(Integer.valueOf(indexFromTaskId)));
        arrayList.remove(attemptFromTaskId);
        return (List) arrayList.stream().map(taskState -> {
            return taskState.getStateMachine().getCurrentState();
        }).collect(Collectors.toList());
    }

    public synchronized PhysicalPlan getPhysicalPlan() {
        return this.physicalPlan;
    }

    public int getMaxScheduleAttempt() {
        return this.maxScheduleAttempt;
    }

    public synchronized boolean isInitialized() {
        return this.initialized;
    }

    public void storeJSON(String str) {
        if (this.dagDirectory.equals("")) {
            return;
        }
        File file = new File(this.dagDirectory, this.planId + "-" + this.dagLogFileIndex + "-" + str + ".json");
        file.getParentFile().mkdirs();
        try {
            try {
                PrintWriter printWriter = new PrintWriter(file);
                try {
                    printWriter.println(toStringWithPhysicalPlan());
                    LOG.debug(String.format("JSON representation of plan state for %s(%s) was saved to %s", this.planId, this.dagLogFileIndex + "-" + str, file.getPath()));
                    printWriter.close();
                    this.dagLogFileIndex++;
                } catch (Throwable th) {
                    try {
                        printWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                    throw th;
                }
            } catch (Throwable th3) {
                this.dagLogFileIndex++;
                throw th3;
            }
        } catch (IOException e) {
            LOG.warn(String.format("Cannot store JSON representation of plan state for %s(%s) to %s: %s", this.planId, this.dagLogFileIndex + "-" + str, file.getPath(), e.toString()));
            this.dagLogFileIndex++;
        }
    }

    private String toStringWithPhysicalPlan() {
        StringBuilder sb = new StringBuilder("{");
        sb.append("\"dag\": ").append(this.physicalPlan.getStageDAG().toString()).append(", ");
        sb.append("\"planState\": ").append(toString()).append("}");
        return sb.toString();
    }

    public synchronized String toString() {
        StringBuilder sb = new StringBuilder("{");
        sb.append("\"planId\": \"").append(this.planId).append("\", ");
        sb.append("\"stages\": [");
        boolean z = true;
        for (Stage stage : this.physicalPlan.getStageDAG().getVertices()) {
            if (!z) {
                sb.append(", ");
            }
            z = false;
            StageState stageState = this.stageIdToState.get(stage.getId());
            sb.append("{\"id\": \"").append(stage.getId()).append("\", ");
            sb.append("\"state\": \"").append(stageState.toString()).append("\", ");
            sb.append("\"tasks\": [");
            boolean z2 = true;
            for (Map.Entry<String, TaskState.State> entry : getTaskAttemptIdsToItsState(stage.getId()).entrySet()) {
                if (!z2) {
                    sb.append(", ");
                }
                z2 = false;
                sb.append("{\"id\": \"").append(entry.getKey()).append("\", ");
                sb.append("\"state\": \"").append(entry.getValue().toString()).append("\"}");
            }
            sb.append("]}");
        }
        sb.append("]}");
        return sb.toString();
    }
}
