package org.apache.kylin.job.execution;

import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.scheduler.EventBusFactory;
import org.apache.kylin.common.scheduler.JobFinishedNotifier;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.exception.ExecuteRuntimeException;
import org.apache.kylin.job.exception.JobStoppedException;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;

/* loaded from: input_file:org/apache/kylin/job/execution/DefaultExecutable.class */
public class DefaultExecutable extends AbstractExecutable implements ChainedExecutable, DagExecutable {
    private final List<AbstractExecutable> subTasks;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kylin.job.execution.DefaultExecutable$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/kylin/job/execution/DefaultExecutable$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kylin$job$execution$ExecutableState = new int[ExecutableState.values().length];

        static {
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.RUNNING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.ERROR.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.DISCARDED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.SUICIDAL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.PAUSED.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.SUCCEED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.SKIP.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.WARNING.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$ExecutableState[ExecutableState.READY.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            $SwitchMap$org$apache$kylin$job$execution$JobSchedulerModeEnum = new int[JobSchedulerModeEnum.values().length];
            try {
                $SwitchMap$org$apache$kylin$job$execution$JobSchedulerModeEnum[JobSchedulerModeEnum.DAG.ordinal()] = 1;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kylin$job$execution$JobSchedulerModeEnum[JobSchedulerModeEnum.CHAIN.ordinal()] = 2;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    public DefaultExecutable() {
        this.subTasks = Lists.newArrayList();
    }

    public DefaultExecutable(Object obj) {
        super(obj);
        this.subTasks = Lists.newArrayList();
    }

    public Set<String> getMetadataDumpList(KylinConfig kylinConfig) {
        return Collections.emptySet();
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    public ExecuteResult doWork(ExecutableContext executableContext) throws ExecuteException {
        Stream<AbstractExecutable> stream = getTasks().stream();
        Class<Executable> cls = Executable.class;
        Executable.class.getClass();
        List<Executable> list = (List) stream.map((v1) -> {
            return r1.cast(v1);
        }).collect(Collectors.toList());
        switch (getJobSchedulerMode()) {
            case DAG:
                logger.info("Execute in DAG mode.");
                dagSchedule(list, executableContext);
                break;
            case CHAIN:
            default:
                logger.info("Execute in CHAIN mode.");
                chainedSchedule(list, executableContext);
                break;
        }
        return ExecuteResult.createSucceed();
    }

    @Override // org.apache.kylin.job.execution.ChainedExecutable
    public void chainedSchedule(List<Executable> list, ExecutableContext executableContext) throws ExecuteException {
        Iterator<Executable> it = list.iterator();
        while (it.hasNext()) {
            executeStep(it.next(), executableContext);
        }
    }

    @Override // org.apache.kylin.job.execution.DagExecutable
    public void dagSchedule(List<Executable> list, ExecutableContext executableContext) throws ExecuteException {
        List<Executable> list2 = (List) list.stream().filter(executable -> {
            return StringUtils.isBlank(executable.getPreviousStep());
        }).collect(Collectors.toList());
        Map<String, Executable> map = (Map) list.stream().collect(Collectors.toMap((v0) -> {
            return v0.getId();
        }, executable2 -> {
            return executable2;
        }));
        resetJobErrorMessage();
        dagExecute(list2, map, executableContext);
        waitAllDagExecutablesFinished(list);
    }

    private void resetJobErrorMessage() {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            getManager().updateJobError(getId(), null, null, null, null);
            return null;
        }, this.project, 3, getEpochId(), getTempLockName());
    }

    public void dagExecute(List<Executable> list, Map<String, Executable> map, ExecutableContext executableContext) throws ExecuteException {
        try {
            if (list.size() != 1) {
                list.forEach(executable -> {
                    new ExecutableThread(this, map, executableContext, executable).start();
                });
            } else {
                logger.info("dagExecute execute single : {}", list.get(0).getDisplayName());
                executeDagExecutable(map, list.get(0), executableContext);
            }
        } catch (Exception e) {
            throw new ExecuteException(e);
        }
    }

    public void executeDagExecutable(Map<String, Executable> map, Executable executable, ExecutableContext executableContext) {
        try {
            logger.info("execute dag executable : {}-{} -> {}", new Object[]{Thread.currentThread().getName(), Long.valueOf(Thread.currentThread().getId()), executable.getDisplayName()});
            checkPreviousStep(map, executable);
            executeStep(executable, executableContext);
            executeNextSteps(map, executable, executableContext);
        } catch (Exception e) {
            throw new ExecuteRuntimeException(e);
        }
    }

    private void executeNextSteps(Map<String, Executable> map, Executable executable, ExecutableContext executableContext) throws ExecuteException {
        Set<String> nextSteps = executable.getNextSteps();
        if (CollectionUtils.isNotEmpty(nextSteps)) {
            Stream<String> stream = nextSteps.stream();
            map.getClass();
            dagExecute((List) stream.map((v1) -> {
                return r1.get(v1);
            }).collect(Collectors.toList()), map, executableContext);
        }
    }

    public void waitAllDagExecutablesFinished(List<Executable> list) {
        HashSet newHashSet;
        while (list.stream().filter(executable -> {
            return executable.getStatus().equals(ExecutableState.RUNNING) || executable.getStatus().equals(ExecutableState.READY);
        }).count() != 0) {
            try {
                newHashSet = Sets.newHashSet(new ExecutableState[]{ExecutableState.ERROR, ExecutableState.PAUSED, ExecutableState.DISCARDED});
            } catch (InterruptedException e) {
                logger.error("wait all next step success has error : {}", e.getMessage());
                Thread.currentThread().interrupt();
            }
            if (list.stream().filter(executable2 -> {
                return newHashSet.contains(executable2.getStatus());
            }).count() != 0) {
                logger.debug("{} next step has error", list.get(0).getPreviousStep());
                return;
            }
            TimeUnit.SECONDS.sleep(10L);
        }
        logger.debug("{} all next step finished", list.get(0).getPreviousStep());
    }

    private void executeStep(Executable executable, ExecutableContext executableContext) throws ExecuteException {
        if (executable.isRunnable()) {
            executable.execute(executableContext);
        } else {
            if (!executable.getStatus().isNotBad()) {
                throw new IllegalStateException("invalid subtask state, sub task:" + executable.getDisplayName() + ", state:" + executable.getStatus());
            }
            logger.info("step {} is already succeed, skip it.", executable.getDisplayName());
        }
    }

    private void checkPreviousStep(Map<String, Executable> map, Executable executable) {
        if (StringUtils.isNotBlank(executable.getPreviousStep())) {
            Executable executable2 = map.get(executable.getPreviousStep());
            if (ExecutableState.SUCCEED != executable2.getStatus()) {
                throw new IllegalStateException("invalid subtask state, sub task:" + executable2.getDisplayName() + ", state:" + executable2.getStatus());
            }
        }
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    protected boolean needCheckState() {
        return false;
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    protected void onExecuteStart() throws JobStoppedException {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            if (isStoppedNonVoluntarily() && ExecutableState.READY != getOutput().getState()) {
                return null;
            }
            updateJobOutput(this.project, getId(), ExecutableState.RUNNING, null, null, null);
            return null;
        }, this.project, 3, getEpochId(), getTempLockName());
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    protected void onExecuteFinished(ExecuteResult executeResult) throws JobStoppedException {
        ExecutableState checkState = checkState();
        logger.info("Job finished {}, state:{}", getDisplayName(), checkState);
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            switch (AnonymousClass1.$SwitchMap$org$apache$kylin$job$execution$ExecutableState[checkState.ordinal()]) {
                case 2:
                case 5:
                case 9:
                    if (isStoppedNonVoluntarily()) {
                        logger.info("Execute finished  {} which is stopped nonvoluntarily, state: {}", getDisplayName(), getOutput().getState());
                        return null;
                    }
                    Consumer<String> consumer = null;
                    Map<String, String> map = null;
                    String str = null;
                    String str2 = null;
                    if (checkState == ExecutableState.ERROR) {
                        logger.warn("[UNEXPECTED_THINGS_HAPPENED] Unexpected ERROR state discovered here!!!");
                        map = executeResult.getExtraInfo();
                        str = executeResult.getErrorMsg();
                        consumer = this::onExecuteErrorHook;
                        str2 = executeResult.getShortErrMsg();
                    }
                    updateJobOutput(getProject(), getId(), checkState, map, str, str2, consumer);
                    if (checkState != ExecutableState.ERROR) {
                        return null;
                    }
                    onStatusChange(ExecutableState.ERROR);
                    return null;
                case ExecutablePO.DEFAULT_PRIORITY /* 3 */:
                    updateToFinalState(ExecutableState.DISCARDED, this::onExecuteDiscardHook, executeResult.getShortErrMsg());
                    onStatusChange(ExecutableState.DISCARDED);
                    return null;
                case ExecutablePO.LOWEST_PRIORITY /* 4 */:
                    updateToFinalState(ExecutableState.SUICIDAL, this::onExecuteSuicidalHook, executeResult.getShortErrMsg());
                    onStatusChange(ExecutableState.SUICIDAL);
                    return null;
                case 6:
                    updateToFinalState(ExecutableState.SUCCEED, this::afterUpdateOutput, executeResult.getShortErrMsg());
                    onStatusChange(ExecutableState.SUCCEED);
                    return null;
                case 7:
                case 8:
                default:
                    throw new IllegalArgumentException("Illegal state when job finished: " + checkState);
            }
        }, this.project, 3, getEpochId(), getTempLockName());
        EventBusFactory.getInstance().postSync(new JobFinishedNotifier(getId(), getProject(), getTargetSubject(), getDuration(), checkState.toString(), getJobType().toString(), getSegmentIds(), getLayoutIds(), getTargetPartitions(), getWaitTime(), getClass().getName(), getSubmitter(), executeResult.succeed(), getJobStartTime(), getJobEndTime(), getTag(), executeResult.getThrowable()));
    }

    private ExecutableState checkState() {
        boolean z = true;
        boolean z2 = false;
        boolean z3 = false;
        boolean z4 = false;
        boolean z5 = false;
        for (AbstractExecutable abstractExecutable : getTasks()) {
            logger.info("Sub-task finished {}, state: {}", abstractExecutable.getDisplayName(), abstractExecutable.getStatus());
            boolean z6 = false;
            switch (AnonymousClass1.$SwitchMap$org$apache$kylin$job$execution$ExecutableState[abstractExecutable.getStatus().ordinal()]) {
                case 1:
                    z2 = true;
                    break;
                case 2:
                    z2 = true;
                    break;
                case ExecutablePO.DEFAULT_PRIORITY /* 3 */:
                    z3 = true;
                    break;
                case ExecutablePO.LOWEST_PRIORITY /* 4 */:
                    z4 = true;
                    break;
                case 5:
                    z5 = true;
                    break;
                case 6:
                case 7:
                case 8:
                    z6 = true;
                    break;
            }
            z &= z6;
        }
        return z ? ExecutableState.SUCCEED : z3 ? ExecutableState.DISCARDED : z4 ? ExecutableState.SUICIDAL : z2 ? ExecutableState.ERROR : z5 ? ExecutableState.PAUSED : ExecutableState.READY;
    }

    private long getJobStartTime() {
        return ((Long) this.subTasks.stream().map((v0) -> {
            return v0.getStartTime();
        }).filter(l -> {
            return l.longValue() != 0;
        }).min(Comparator.comparingLong(l2 -> {
            return l2.longValue();
        })).orElse(0L)).longValue();
    }

    private long getJobEndTime() {
        return ((Long) this.subTasks.stream().map((v0) -> {
            return v0.getEndTime();
        }).filter(l -> {
            return l.longValue() != 0;
        }).max(Comparator.comparingLong(l2 -> {
            return l2.longValue();
        })).orElse(Long.valueOf(System.currentTimeMillis()))).longValue();
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    public long getWaitTime() {
        return this.subTasks.stream().map((v0) -> {
            return v0.getWaitTime();
        }).mapToLong((v0) -> {
            return v0.longValue();
        }).sum();
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    public long getDuration() {
        return this.subTasks.stream().map((v0) -> {
            return v0.getDuration();
        }).mapToLong((v0) -> {
            return v0.longValue();
        }).sum();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Optional<AbstractExecutable> getSubTaskByStepId(int i) {
        return (i < 0 || i >= this.subTasks.size()) ? Optional.empty() : Optional.ofNullable(this.subTasks.get(i));
    }

    protected void onExecuteDiscardHook(String str) {
    }

    protected void onExecuteSuicidalHook(String str) {
    }

    private void updateToFinalState(ExecutableState executableState, Consumer<String> consumer, String str) throws PersistentException, ExecuteException, InterruptedException {
        if (getOutput().getState().isFinalState()) {
            return;
        }
        updateJobOutput(getProject(), getId(), executableState, null, null, str, consumer);
    }

    @Override // org.apache.kylin.job.execution.ChainedExecutable, org.apache.kylin.job.execution.DagExecutable
    public List<AbstractExecutable> getTasks() {
        return this.subTasks;
    }

    @Override // org.apache.kylin.job.execution.AbstractExecutable
    protected boolean needRetry() {
        return false;
    }

    @Override // org.apache.kylin.job.execution.ChainedExecutable, org.apache.kylin.job.execution.DagExecutable
    public void addTask(AbstractExecutable abstractExecutable) {
        int size = this.subTasks.size();
        abstractExecutable.setId(getId() + "_" + String.format(Locale.ROOT, "%02d", Integer.valueOf(size)));
        abstractExecutable.setParent(this);
        abstractExecutable.setStepId(size);
        this.subTasks.add(abstractExecutable);
    }

    @Override // org.apache.kylin.job.execution.ChainedExecutable, org.apache.kylin.job.execution.DagExecutable
    public <T extends AbstractExecutable> T getTask(Class<T> cls) {
        Iterator<AbstractExecutable> it = getTasks().iterator();
        while (it.hasNext()) {
            T t = (T) it.next();
            if (t.getClass().equals(cls)) {
                return t;
            }
        }
        return null;
    }

    protected void afterUpdateOutput(String str) {
    }

    protected void onStatusChange(ExecutableState executableState) {
        super.notifyUserStatusChange(executableState);
    }
}
