package org.apache.kylin.job.impl.threadpool;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.guava30.shaded.common.collect.Lists;
import org.apache.kylin.guava30.shaded.common.collect.Sets;
import org.apache.kylin.job.dao.ExecutableOutputPO;
import org.apache.kylin.job.dao.ExecutablePO;
import org.apache.kylin.job.engine.JobEngineConfig;
import org.apache.kylin.job.exception.JobStoppedException;
import org.apache.kylin.job.exception.JobStoppedNonVoluntarilyException;
import org.apache.kylin.job.exception.PersistentException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.DefaultExecutable;
import org.apache.kylin.job.execution.DefaultExecutableOnModel;
import org.apache.kylin.job.execution.ErrorTestExecutable;
import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.job.execution.FailedTestExecutable;
import org.apache.kylin.job.execution.FiveSecondErrorTestExecutable;
import org.apache.kylin.job.execution.FiveSecondSucceedTestExecutable;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.job.execution.LongRunningTestExecutable;
import org.apache.kylin.job.execution.NExecutableManager;
import org.apache.kylin.job.execution.NoErrorStatusExecutableOnModel;
import org.apache.kylin.job.execution.SucceedTestExecutable;
import org.apache.kylin.job.handler.AbstractJobHandlerTest;
import org.apache.kylin.junit.rule.Repeat;
import org.apache.kylin.junit.rule.RepeatRule;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.model.NIndexPlanManager;
import org.apache.kylin.metadata.epoch.EpochManager;
import org.apache.kylin.metadata.model.ManagementType;
import org.apache.kylin.metadata.model.NDataModelManager;
import org.apache.kylin.metadata.project.EnhancedUnitOfWork;
import org.apache.kylin.metadata.project.NProjectManager;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.awaitility.core.ConditionTimeoutException;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.RuleChain;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestRule;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kylin/job/impl/threadpool/NDefaultSchedulerTest.class */
public class NDefaultSchedulerTest extends BaseSchedulerTest {
    private static final Logger logger = LoggerFactory.getLogger(NDefaultSchedulerTest.class);

    @Rule
    public TemporaryFolder temporaryFolder;
    public ExpectedException thrown;

    @Rule
    public TestRule chain;

    public NDefaultSchedulerTest() {
        super(AbstractJobHandlerTest.DEFAULT_PROJECT);
        this.temporaryFolder = new TemporaryFolder();
        this.thrown = ExpectedException.none();
        this.chain = RuleChain.outerRule(new RepeatRule()).around(this.thrown);
    }

    @Override // org.apache.kylin.job.impl.threadpool.BaseSchedulerTest
    public void setup() throws Exception {
        overwriteSystemProp("kylin.job.auto-set-concurrent-jobs", "true");
        overwriteSystemProp("kylin.env", "UT");
        overwriteSystemProp("kylin.storage.check-quota-enabled", "true");
        super.setup();
    }

    @Test
    public void testSingleTaskJob() {
        logger.info("testSingleTaskJob");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(succeedTestExecutable.getId()).getState());
    }

    @Test
    public void testGetLogStream() throws IOException {
        File newFile = this.temporaryFolder.newFile("execute_output.json." + System.currentTimeMillis() + ".log");
        for (int i = 0; i < 200; i++) {
            Files.write(newFile.toPath(), String.format(Locale.ROOT, "lines: %s\n", Integer.valueOf(i)).getBytes(Charset.defaultCharset()), StandardOpenOption.APPEND);
        }
        InputStream logStream = executableManager.getLogStream(newFile.getAbsolutePath());
        Assert.assertNotNull(logStream);
        Assert.assertTrue(logStream instanceof FSDataInputStream);
        Assert.assertNull(executableManager.getLogStream(newFile.getAbsolutePath() + "/123"));
    }

    @Test
    public void testGetOutputFromHDFSByJobId() throws IOException, PersistentException {
        File newFile = this.temporaryFolder.newFile("execute_output.json." + System.currentTimeMillis() + ".log");
        for (int i = 0; i < 200; i++) {
            Files.write(newFile.toPath(), String.format(Locale.ROOT, "lines: %s\n", Integer.valueOf(i)).getBytes(Charset.defaultCharset()), StandardOpenOption.APPEND);
        }
        String[] strArr = (String[]) Files.readAllLines(newFile.toPath()).toArray(new String[0]);
        NExecutableManager nExecutableManager = executableManager;
        ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
        executableOutputPO.setStatus("SUCCEED");
        executableOutputPO.setContent("succeed");
        executableOutputPO.setLogPath(newFile.getAbsolutePath());
        nExecutableManager.updateJobOutputToHDFS(KylinConfig.getInstanceFromEnv().getJobTmpOutputStorePath(AbstractJobHandlerTest.DEFAULT_PROJECT, "e1ad7bb0-522e-456a-859d-2eab1df448de"), executableOutputPO);
        InputStream verboseMsgStream = executableManager.getOutputFromHDFSByJobId("e1ad7bb0-522e-456a-859d-2eab1df448de", "e1ad7bb0-522e-456a-859d-2eab1df448de", Integer.MAX_VALUE).getVerboseMsgStream();
        Throwable th = null;
        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(verboseMsgStream, Charset.defaultCharset()));
            Throwable th2 = null;
            try {
                try {
                    StringBuilder sb = new StringBuilder();
                    while (true) {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        }
                        if (sb.length() > 0) {
                            sb.append('\n');
                        }
                        sb.append(readLine);
                    }
                    String sb2 = sb.toString();
                    if (bufferedReader != null) {
                        if (0 != 0) {
                            try {
                                bufferedReader.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            bufferedReader.close();
                        }
                    }
                    Assert.assertTrue(Arrays.deepEquals(strArr, StringUtils.splitByWholeSeparatorPreserveAllTokens(sb2, "\n")));
                    String[] splitByWholeSeparatorPreserveAllTokens = StringUtils.splitByWholeSeparatorPreserveAllTokens(executableManager.getOutputFromHDFSByJobId("e1ad7bb0-522e-456a-859d-2eab1df448de", "e1ad7bb0-522e-456a-859d-2eab1df448de", 100).getVerboseMsg(), "\n");
                    ArrayList newArrayList = Lists.newArrayList(strArr);
                    newArrayList.add("================================================================");
                    Assert.assertTrue(Sets.newHashSet(newArrayList).containsAll(Sets.newHashSet(splitByWholeSeparatorPreserveAllTokens)));
                } finally {
                }
            } catch (Throwable th4) {
                if (bufferedReader != null) {
                    if (th2 != null) {
                        try {
                            bufferedReader.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        bufferedReader.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (verboseMsgStream != null) {
                if (0 != 0) {
                    try {
                        verboseMsgStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    verboseMsgStream.close();
                }
            }
        }
    }

    @Test
    public void testSucceed() {
        logger.info("testSucceed");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable2 = new SucceedTestExecutable();
        succeedTestExecutable2.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable2.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        defaultExecutableOnModel.addTask(succeedTestExecutable2);
        executableManager.addJob(defaultExecutableOnModel);
        assertMemoryRestore(currentAvailableMem);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        assertTimeLegal(defaultExecutableOnModel.getId());
        waitForJobFinish(defaultExecutableOnModel.getId());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(succeedTestExecutable2.getId()).getState());
        getConditionFactory().untilAsserted(() -> {
            Assertions.assertThat(executableManager.getOutputFromHDFSByJobId(defaultExecutableOnModel.getId(), succeedTestExecutable.getId()).getVerboseMsg()).contains(new CharSequence[]{"succeed"});
            Assertions.assertThat(executableManager.getOutputFromHDFSByJobId(defaultExecutableOnModel.getId(), succeedTestExecutable2.getId()).getVerboseMsg()).contains(new CharSequence[]{"succeed"});
        });
        assertTimeSucceed(createTime, defaultExecutableOnModel.getId());
        testJobStopped(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    private void assertJobTime(AbstractExecutable abstractExecutable) {
        assertJobTime(abstractExecutable, 0L);
    }

    private void assertJobTime(AbstractExecutable abstractExecutable, long j) {
        if (abstractExecutable instanceof DefaultExecutable) {
            DefaultExecutable defaultExecutable = (DefaultExecutable) abstractExecutable;
            long createTime = defaultExecutable.getOutput().getCreateTime();
            long j2 = 0;
            long j3 = 0;
            ExecutableState status = defaultExecutable.getStatus();
            for (AbstractExecutable abstractExecutable2 : defaultExecutable.getTasks()) {
                long startTime = abstractExecutable2.getStartTime();
                int stepId = abstractExecutable2.getStepId();
                if (abstractExecutable2.getStatus().isFinalState()) {
                    Assert.assertTrue(startTime > 0);
                    Assert.assertTrue(abstractExecutable2.getEndTime() > 0);
                }
                if (stepId > 0 && (createTime == 0 || status != ExecutableState.SUCCEED)) {
                    Assert.assertEquals(0L, abstractExecutable2.getWaitTime());
                } else if (abstractExecutable2.getStartTime() == 0) {
                    Assert.assertEquals((float) (System.currentTimeMillis() - createTime), (float) abstractExecutable2.getWaitTime(), (float) j);
                } else {
                    Assert.assertEquals(abstractExecutable2.getStartTime() - createTime, abstractExecutable2.getWaitTime());
                }
                createTime = abstractExecutable2.getOutput().getEndTime();
                status = abstractExecutable2.getStatus();
                j2 += abstractExecutable2.getWaitTime();
                j3 += abstractExecutable2.getDuration();
            }
            Assert.assertEquals((float) defaultExecutable.getWaitTime(), (float) j2, (float) j);
            Assert.assertEquals((float) defaultExecutable.getDuration(), (float) j3, (float) j);
            Assert.assertEquals((float) defaultExecutable.getTotalDurationTime(), (float) (defaultExecutable.getWaitTime() + defaultExecutable.getDuration()), (float) j);
        }
    }

    private void assertTimeSucceed(long j, String str) {
        DefaultExecutable job = executableManager.getJob(str);
        assertJobRun(j, job);
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(job.getId()).getState());
        if (job instanceof DefaultExecutable) {
            Assert.assertTrue(job.getTasks().stream().allMatch(abstractExecutable -> {
                return ExecutableState.SUCCEED == executableManager.getOutput(abstractExecutable.getId()).getState();
            }));
        }
        assertJobTime(job);
    }

    private void assertTimeError(long j, String str) {
        DefaultExecutable job = executableManager.getJob(str);
        assertJobRun(j, job);
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(job.getId()).getState());
        if (job instanceof DefaultExecutable) {
            Assert.assertTrue(job.getTasks().stream().anyMatch(abstractExecutable -> {
                return ExecutableState.ERROR == executableManager.getOutput(abstractExecutable.getId()).getState();
            }));
        }
        assertJobTime(job);
    }

    private void assertTimeSuicide(long j, String str) {
        assertTimeFinalState(j, str, ExecutableState.SUICIDAL);
        assertJobTime(executableManager.getJob(str));
    }

    private void assertTimeDiscard(long j, String str) {
        assertTimeFinalState(j, str, ExecutableState.DISCARDED);
        assertJobTime(executableManager.getJob(str));
    }

    private void assertTimeFinalState(long j, String str, ExecutableState executableState) {
        AbstractExecutable job = executableManager.getJob(str);
        Assert.assertNotNull(job);
        Assert.assertEquals(executableState, executableManager.getOutput(job.getId()).getState());
        Assert.assertEquals(j, job.getCreateTime());
        Assert.assertTrue(job.getStartTime() > 0);
        Assert.assertTrue(job.getEndTime() > 0);
        Assert.assertTrue(job.getDuration() >= 0);
        Assert.assertTrue(job.getWaitTime() >= 0);
    }

    private void assertTimeRunning(long j, String str) {
        AbstractExecutable job = executableManager.getJob(str);
        Assert.assertNotNull(job);
        Assert.assertEquals(j, job.getCreateTime());
        Assert.assertEquals(ExecutableState.RUNNING, job.getStatus());
        assertJobTime(job, 100L);
    }

    private void assertTimeLegal(String str) {
        DefaultExecutable job = executableManager.getJob(str);
        assertTimeLegal((AbstractExecutable) job);
        if (job instanceof DefaultExecutable) {
            Iterator it = job.getTasks().iterator();
            while (it.hasNext()) {
                assertTimeLegal((AbstractExecutable) it.next());
            }
        }
    }

    private void assertTimeLegal(AbstractExecutable abstractExecutable) {
        Assert.assertNotNull(abstractExecutable);
        Assert.assertTrue(abstractExecutable.getCreateTime() > 0);
        Assert.assertTrue(abstractExecutable.getStartTime() >= 0);
        Assert.assertTrue(abstractExecutable.getEndTime() >= 0);
        Assert.assertTrue(abstractExecutable.getDuration() >= 0);
        Assert.assertTrue(abstractExecutable.getWaitTime() >= 0);
    }

    private void assertJobRun(long j, AbstractExecutable abstractExecutable) {
        Assert.assertNotNull(abstractExecutable);
        Assert.assertEquals(j, abstractExecutable.getCreateTime());
        Assert.assertTrue(abstractExecutable.getStartTime() > 0);
        Assert.assertTrue(abstractExecutable.getEndTime() > 0);
        Assert.assertTrue(abstractExecutable.getDuration() > 0);
        Assert.assertTrue(abstractExecutable.getWaitTime() >= 0);
    }

    private void assertMemoryRestore(double d) {
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(d, NDefaultScheduler.currentAvailableMem(), 0.1d);
        });
    }

    @Test
    @Repeat(3)
    public void testSucceedAndFailed() {
        logger.info("testSucceedAndFailed");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FailedTestExecutable failedTestExecutable = new FailedTestExecutable();
        failedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        failedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        defaultExecutableOnModel.addTask(failedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(failedTestExecutable.getId()).getState());
        getConditionFactory().untilAsserted(() -> {
            Assertions.assertThat(executableManager.getOutputFromHDFSByJobId(defaultExecutableOnModel.getId()).getVerboseMsg()).contains(new CharSequence[]{"org.apache.kylin.job.execution.MockJobException"});
            Assertions.assertThat(executableManager.getOutputFromHDFSByJobId(defaultExecutableOnModel.getId(), succeedTestExecutable.getId()).getVerboseMsg()).contains(new CharSequence[]{"succeed"});
            Assertions.assertThat(executableManager.getOutputFromHDFSByJobId(defaultExecutableOnModel.getId(), failedTestExecutable.getId()).getVerboseMsg()).contains(new CharSequence[]{"org.apache.kylin.job.execution.MockJobException"});
        });
        assertTimeError(createTime, defaultExecutableOnModel.getId());
        testJobPending(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        executableManager.updateJobOutput(failedTestExecutable.getId(), ExecutableState.READY);
        executableManager.updateJobOutput(failedTestExecutable.getId(), ExecutableState.RUNNING);
        ((NExecutableManager) Mockito.doReturn(failedTestExecutable).when(executableManager)).getJob(Mockito.anyString());
        ExecutableOutputPO executableOutputPO = new ExecutableOutputPO();
        executableOutputPO.setLogPath("/kylin/null.log");
        ((NExecutableManager) Mockito.doReturn(executableOutputPO).when(executableManager)).getJobOutputFromHDFS(Mockito.anyString());
        failedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        Assert.assertEquals("Wait a moment ... ", executableManager.getOutputFromHDFSByJobId(failedTestExecutable.getId()).getVerboseMsg());
    }

    @Test
    @Repeat(3)
    public void testSucceedAndError() {
        logger.info("testSucceedAndError");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        errorTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        errorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        defaultExecutableOnModel.addTask(errorTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        assertMemoryRestore(currentAvailableMem);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        waitForJobFinish(defaultExecutableOnModel.getId());
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(errorTestExecutable.getId()).getState());
        getConditionFactory().untilAsserted(() -> {
            Assertions.assertThat(executableManager.getOutputFromHDFSByJobId(defaultExecutableOnModel.getId()).getVerboseMsg()).contains(new CharSequence[]{"test error"});
            Assertions.assertThat(executableManager.getOutputFromHDFSByJobId(defaultExecutableOnModel.getId(), errorTestExecutable.getId()).getVerboseMsg()).contains(new CharSequence[]{"test error"});
        });
        testJobPending(defaultExecutableOnModel.getId());
        assertTimeError(createTime, defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    @Test
    public void testDiscard() {
        logger.info("testDiscard");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable2 = new SucceedTestExecutable();
        succeedTestExecutable2.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable2.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        defaultExecutableOnModel.addTask(succeedTestExecutable2);
        executableManager.addJob(defaultExecutableOnModel);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime > 0);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(defaultExecutableOnModel.getStatus() == ExecutableState.RUNNING);
        });
        discardJobWithLock(defaultExecutableOnModel.getId());
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.DISCARDED, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) getManager().getJob(defaultExecutableOnModel.getId()).getTasks().get(0)).getStatus().isFinalState());
        });
        assertTimeDiscard(createTime, defaultExecutableOnModel.getId());
        testJobStopped(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(1L, this.killProcessCount.get());
    }

    @Test
    public void testDiscardJobBeforeSchedule() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(succeedTestExecutable);
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataModelManager nDataModelManager = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project);
            nDataModelManager.dropModel(nDataModelManager.getDataModelDesc("89af4ee2-2cdb-4b07-b39e-4c29856309aa"));
            return null;
        }, this.project, 3, -1L, noErrorStatusExecutableOnModel.getId());
        executableManager.addJob(noErrorStatusExecutableOnModel);
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.SUICIDAL, executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getStatus());
        });
        assertTimeSuicide(noErrorStatusExecutableOnModel.getCreateTime(), noErrorStatusExecutableOnModel.getId());
        testJobStopped(noErrorStatusExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    @Test
    public void testDiscardErrorJobBeforeSchedule() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        errorTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        errorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(errorTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.ERROR, executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getStatus());
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataModelManager nDataModelManager = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project);
            nDataModelManager.dropModel(nDataModelManager.getDataModelDesc("89af4ee2-2cdb-4b07-b39e-4c29856309aa"));
            return null;
        }, this.project, 3, -1L, noErrorStatusExecutableOnModel.getId());
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.SUICIDAL, executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getStatus());
        });
        assertTimeSuicide(noErrorStatusExecutableOnModel.getCreateTime(), noErrorStatusExecutableOnModel.getId());
        testJobStopped(noErrorStatusExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    @Test
    public void testDiscardPausedJobBeforeSchedule() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable2 = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable2.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        fiveSecondSucceedTestExecutable2.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(fiveSecondSucceedTestExecutable2);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.RUNNING, executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getStatus());
        });
        pauseJobWithLock(noErrorStatusExecutableOnModel.getId());
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.PAUSED, ((AbstractExecutable) executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getTasks().get(0)).getStatus());
        });
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            NDataModelManager nDataModelManager = NDataModelManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project);
            nDataModelManager.dropModel(nDataModelManager.getDataModelDesc("89af4ee2-2cdb-4b07-b39e-4c29856309aa"));
            return null;
        }, this.project, 1, -1L, noErrorStatusExecutableOnModel.getId());
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.SUICIDAL, executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getStatus());
        });
        assertTimeSuicide(noErrorStatusExecutableOnModel.getCreateTime(), noErrorStatusExecutableOnModel.getId());
        testJobStopped(noErrorStatusExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    private void testJobStopped(String str) {
        long[] allDurations = getAllDurations(str);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Assert.assertArrayEquals(allDurations, getAllDurations(str));
    }

    private long[] getAllDurations(String str) {
        DefaultExecutable job = executableManager.getJob(str);
        long[] jArr = new long[2 * (job.getTasks().size() + 1)];
        jArr[0] = job.getDuration();
        jArr[1] = job.getWaitTime();
        int i = 2;
        for (AbstractExecutable abstractExecutable : job.getTasks()) {
            jArr[i] = abstractExecutable.getDuration();
            jArr[i + 1] = abstractExecutable.getWaitTime();
            i += 2;
        }
        return jArr;
    }

    private void testJobPending(String str) {
        long[] durationByJobId = getDurationByJobId(str);
        double[] waitTimeByJobId = getWaitTimeByJobId(str);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        long[] durationByJobId2 = getDurationByJobId(str);
        double[] waitTimeByJobId2 = getWaitTimeByJobId(str);
        Assert.assertArrayEquals(durationByJobId, durationByJobId2);
        executableManager.getJob(str);
        Assert.assertArrayEquals(waitTimeByJobId, waitTimeByJobId2, 50.0d);
    }

    private long[] getDurationByJobId(String str) {
        DefaultExecutable job = executableManager.getJob(str);
        long[] jArr = new long[job.getTasks().size() + 1];
        jArr[0] = job.getDuration();
        int i = 1;
        Iterator it = job.getTasks().iterator();
        while (it.hasNext()) {
            jArr[i] = ((AbstractExecutable) it.next()).getDuration();
            i++;
        }
        return jArr;
    }

    private double[] getWaitTimeByJobId(String str) {
        DefaultExecutable job = executableManager.getJob(str);
        double[] dArr = new double[job.getTasks().size() + 1];
        dArr[0] = job.getWaitTime();
        int i = 1;
        Iterator it = job.getTasks().iterator();
        while (it.hasNext()) {
            dArr[i] = ((AbstractExecutable) it.next()).getWaitTime();
            i++;
        }
        return dArr;
    }

    @Test
    public void testIllegalState() {
        logger.info("testIllegalState");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable2 = new SucceedTestExecutable();
        succeedTestExecutable2.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable2.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        defaultExecutableOnModel.addTask(succeedTestExecutable2);
        executableManager.addJob(defaultExecutableOnModel);
        NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project).updateJobOutput(succeedTestExecutable2.getId(), ExecutableState.RUNNING);
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.RUNNING, executableManager.getOutput(succeedTestExecutable2.getId()).getState());
    }

    @Test
    public void testSuicide_RemoveSegment() {
        changeSchedulerInterval();
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        long createTime = executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getCreateTime();
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(dataflow.getUuid());
        nDataflowUpdate.setToRemoveSegs((NDataSegment[]) dataflow.getSegments().toArray(new NDataSegment[0]));
        nDataflowManager.updateDataflow(nDataflowUpdate);
        waitForJobFinish(noErrorStatusExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        getConditionFactory().untilAsserted(() -> {
            DefaultExecutable job = executableManager.getJob(noErrorStatusExecutableOnModel.getId());
            Assert.assertEquals(ExecutableState.SUICIDAL, job.getStatus());
            Assert.assertEquals(ExecutableState.SUICIDAL, ((AbstractExecutable) job.getTasks().get(0)).getStatus());
        });
        assertTimeSuicide(createTime, noErrorStatusExecutableOnModel.getId());
        testJobStopped(noErrorStatusExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    private void changeSchedulerInterval() {
        changeSchedulerInterval(30);
    }

    private void changeSchedulerInterval(int i) {
        NDefaultScheduler.shutdownByProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        overwriteSystemProp("kylin.job.scheduler.poll-interval-second", String.valueOf(i));
        startScheduler();
    }

    @Test
    @Ignore
    public void testSuicide_RemoveLayout() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        AbstractExecutable initNoErrorJob = initNoErrorJob("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NIndexPlanManager.getInstance(getTestConfig(), this.project).updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", indexPlan -> {
            indexPlan.removeLayouts(Sets.newHashSet(new Long[]{1L, 10001L}), true, true);
        });
        waitForJobFinish(initNoErrorJob.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.SUICIDAL, executableManager.getOutput(initNoErrorJob.getId()).getState());
    }

    @Test
    public void testSuccess_RemoveSomeLayout() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        AbstractExecutable initNoErrorJob = initNoErrorJob("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NIndexPlanManager.getInstance(getTestConfig(), this.project).updateIndexPlan("89af4ee2-2cdb-4b07-b39e-4c29856309aa", indexPlan -> {
            indexPlan.removeLayouts(Sets.newHashSet(new Long[]{1L}), true, true);
        });
        waitForJobFinish(initNoErrorJob.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(initNoErrorJob.getId()).getState());
    }

    private AbstractExecutable initNoErrorJob(String str) {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject(str);
        noErrorStatusExecutableOnModel.setName("NO_ERROR_STATUS_EXECUTABLE");
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,10001");
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(str);
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        succeedTestExecutable.setParam("layoutIds", "1,10001");
        noErrorStatusExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        return noErrorStatusExecutableOnModel;
    }

    @Test
    public void testSuicide_JobCuttingIn() {
        changeSchedulerInterval();
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        noErrorStatusExecutableOnModel.setName(JobTypeEnum.INDEX_BUILD.toString());
        noErrorStatusExecutableOnModel.setJobType(JobTypeEnum.INDEX_BUILD);
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        assertMemoryRestore(currentAvailableMem);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(noErrorStatusExecutableOnModel.getStatus() == ExecutableState.RUNNING);
        });
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel2 = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel2.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel2.setJobType(JobTypeEnum.INC_BUILD);
        noErrorStatusExecutableOnModel2.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        noErrorStatusExecutableOnModel2.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel2.addTask(succeedTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel2);
        waitForJobFinish(noErrorStatusExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getJob(noErrorStatusExecutableOnModel.getId()).getStatus());
        });
    }

    @Test
    public void testJobDiscard_AfterSuccess() throws InterruptedException {
        changeSchedulerInterval();
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(noErrorStatusExecutableOnModel.getStatus() == ExecutableState.RUNNING);
        });
        discardJobWithLock(noErrorStatusExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.DISCARDED, executableManager.getOutput(noErrorStatusExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.DISCARDED, noErrorStatusExecutableOnModel.getStatus());
    }

    @Test
    public void testIncBuildJobError_ModelBasedDataFlowOnline() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        DefaultExecutable testDataflowStatusWhenJobError = testDataflowStatusWhenJobError(ManagementType.MODEL_BASED, JobTypeEnum.INC_BUILD);
        waitForJobFinish(testDataflowStatusWhenJobError.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(RealizationStatusEnum.ONLINE, nDataflowManager.getDataflow(testDataflowStatusWhenJobError.getTargetSubject()).getStatus());
    }

    @Test
    public void testIncBuildJobError_TableOrientedDataFlowLagBehind() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        DefaultExecutable testDataflowStatusWhenJobError = testDataflowStatusWhenJobError(ManagementType.TABLE_ORIENTED, JobTypeEnum.INC_BUILD);
        waitForJobFinish(testDataflowStatusWhenJobError.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(RealizationStatusEnum.LAG_BEHIND, nDataflowManager.getDataflow(testDataflowStatusWhenJobError.getTargetSubject()).getStatus());
    }

    @Test
    public void testIndexBuildJobError_TableOrientedDataFlowOnline() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        DefaultExecutable testDataflowStatusWhenJobError = testDataflowStatusWhenJobError(ManagementType.TABLE_ORIENTED, JobTypeEnum.INDEX_BUILD);
        waitForJobFinish(testDataflowStatusWhenJobError.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(RealizationStatusEnum.ONLINE, nDataflowManager.getDataflow(testDataflowStatusWhenJobError.getTargetSubject()).getStatus());
    }

    @Test
    public void testIndexBuildJobError_ModelBasedDataFlowOnline() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        DefaultExecutable testDataflowStatusWhenJobError = testDataflowStatusWhenJobError(ManagementType.MODEL_BASED, JobTypeEnum.INDEX_BUILD);
        waitForJobFinish(testDataflowStatusWhenJobError.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(RealizationStatusEnum.ONLINE, nDataflowManager.getDataflow(testDataflowStatusWhenJobError.getTargetSubject()).getStatus());
    }

    private DefaultExecutable testDataflowStatusWhenJobError(ManagementType managementType, JobTypeEnum jobTypeEnum) {
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(getTestConfig(), this.project);
        NDataModelManager.getInstance(getTestConfig(), this.project).updateDataModel("89af4ee2-2cdb-4b07-b39e-4c29856309aa", nDataModel -> {
            nDataModel.setManagementType(managementType);
        });
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        noErrorStatusExecutableOnModel.setName(jobTypeEnum.toString());
        noErrorStatusExecutableOnModel.setJobType(jobTypeEnum);
        NDataflow dataflow = nDataflowManager.getDataflow(noErrorStatusExecutableOnModel.getTargetSubject());
        noErrorStatusExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        errorTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        errorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        noErrorStatusExecutableOnModel.addTask(errorTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        return noErrorStatusExecutableOnModel;
    }

    @Test
    @Repeat(3)
    public void testCheckJobStopped_TaskSucceed() throws JobStoppedException {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        List list = (List) NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSegments(list);
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        succeedTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        succeedTestExecutable.setTargetSegments(list);
        noErrorStatusExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        getConditionFactory().until(() -> {
            String str = (String) NExecutableManager.getInstance(getTestConfig(), this.project).getOutput(succeedTestExecutable.getId()).getExtra().get("runningStatus");
            return Boolean.valueOf(noErrorStatusExecutableOnModel.getStatus() == ExecutableState.RUNNING && StringUtils.isNotEmpty(str) && str.equals("inRunning"));
        });
        assertMemoryRestore(currentAvailableMem - noErrorStatusExecutableOnModel.computeStepDriverMemory());
        pauseJobWithLock(noErrorStatusExecutableOnModel.getId());
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.PAUSED, noErrorStatusExecutableOnModel.getStatus());
            Assert.assertEquals(ExecutableState.PAUSED, succeedTestExecutable.getStatus());
        });
        this.thrown.expect(JobStoppedNonVoluntarilyException.class);
        succeedTestExecutable.abortIfJobStopped(true);
        assertMemoryRestore(currentAvailableMem);
    }

    @Test
    public void testCheckJobStopped_TaskError() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        List list = (List) NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa").getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList());
        NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
        noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        noErrorStatusExecutableOnModel.setTargetSegments(list);
        noErrorStatusExecutableOnModel.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        errorTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        errorTestExecutable.setTargetSubject("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        errorTestExecutable.setTargetSegments(list);
        noErrorStatusExecutableOnModel.addTask(errorTestExecutable);
        executableManager.addJob(noErrorStatusExecutableOnModel);
        Awaitility.await().pollInterval(50L, TimeUnit.MILLISECONDS).atMost(60000L, TimeUnit.MILLISECONDS).until(() -> {
            String str = (String) NExecutableManager.getInstance(getTestConfig(), this.project).getOutput(errorTestExecutable.getId()).getExtra().get("runningStatus");
            return Boolean.valueOf(noErrorStatusExecutableOnModel.getStatus() == ExecutableState.RUNNING && StringUtils.isNotEmpty(str) && str.equals("inRunning"));
        });
        pauseJobWithLock(noErrorStatusExecutableOnModel.getId());
        getConditionFactory().untilAsserted(() -> {
            Assert.assertEquals(ExecutableState.PAUSED, noErrorStatusExecutableOnModel.getStatus());
            Assert.assertEquals(ExecutableState.PAUSED, errorTestExecutable.getStatus());
        });
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(1L, this.killProcessCount.get());
    }

    @Test
    public void testSchedulerStop() {
        logger.info("testSchedulerStop");
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(defaultExecutableOnModel.getStatus() == ExecutableState.RUNNING);
        });
        this.scheduler.shutdown();
        Assert.assertFalse(this.scheduler.hasStarted());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getJob(defaultExecutableOnModel.getId()).getStatus());
    }

    @Test
    public void testSchedulerStopCase2() {
        logger.info("testSchedulerStop case 2");
        this.thrown.expect(ConditionTimeoutException.class);
        this.scheduler.shutdown();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        waitForJobFinish(defaultExecutableOnModel.getId());
    }

    @Test
    @Repeat(3)
    public void testSchedulerRestart() {
        logger.info("testSchedulerRestart");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        fiveSecondSucceedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime > 0);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) executableManager.getJob(defaultExecutableOnModel.getId()).getTasks().get(0)).getStatus() == ExecutableState.RUNNING);
        });
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        NDefaultScheduler.shutdownByProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        try {
            Thread.sleep(10000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        executableDao.updateJob(executableManager.getJob(defaultExecutableOnModel.getId()).getId(), executablePO -> {
            executablePO.getOutput().setStatus(ExecutableState.RUNNING.toString());
            executablePO.getOutput().setEndTime(0L);
            ((ExecutablePO) executablePO.getTasks().get(0)).getOutput().setStatus(ExecutableState.RUNNING.toString());
            ((ExecutablePO) executablePO.getTasks().get(0)).getOutput().setEndTime(0L);
            ((ExecutablePO) executablePO.getTasks().get(1)).getOutput().setStatus(ExecutableState.READY.toString());
            ((ExecutablePO) executablePO.getTasks().get(1)).getOutput().setStartTime(0L);
            ((ExecutablePO) executablePO.getTasks().get(1)).getOutput().setWaitTime(0L);
            ((ExecutablePO) executablePO.getTasks().get(1)).getOutput().setEndTime(0L);
            return true;
        });
        Assert.assertEquals(ExecutableState.RUNNING, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.RUNNING, executableManager.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.READY, executableManager.getOutput(fiveSecondSucceedTestExecutable.getId()).getState());
        startScheduler();
        double currentAvailableMem2 = NDefaultScheduler.currentAvailableMem();
        assertMemoryRestore(currentAvailableMem2 - defaultExecutableOnModel.computeStepDriverMemory());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) executableManager.getJob(defaultExecutableOnModel.getId()).getTasks().get(1)).getStatus() == ExecutableState.RUNNING);
        });
        assertTimeRunning(createTime, defaultExecutableOnModel.getId());
        waitForJobFinish(defaultExecutableOnModel.getId());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(succeedTestExecutable.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(fiveSecondSucceedTestExecutable.getId()).getState());
        assertTimeSucceed(createTime, defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem2);
    }

    @Test
    @Repeat(3)
    public void testJobPauseAndResume() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        fiveSecondSucceedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime > 0);
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) executableManager.getJob(defaultExecutableOnModel.getId()).getTasks().get(0)).getStatus() == ExecutableState.RUNNING);
        });
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        pauseJobWithLock(defaultExecutableOnModel.getId());
        try {
            Thread.sleep(7000L);
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        ExecutableDurationContext executableDurationContext = new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId());
        assertPausedState(executableDurationContext, 3000L);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e3) {
            e3.printStackTrace();
        }
        ExecutableDurationContext executableDurationContext2 = new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId());
        assertPausedPending(executableDurationContext, executableDurationContext2, 1000L);
        assertMemoryRestore(currentAvailableMem);
        resumeJobWithLock(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) executableManager.getJob(defaultExecutableOnModel.getId()).getTasks().get(1)).getStatus() == ExecutableState.RUNNING);
        });
        DefaultExecutable job = executableManager.getJob(defaultExecutableOnModel.getId());
        long duration = job.getDuration();
        long duration2 = ((AbstractExecutable) job.getTasks().get(0)).getDuration();
        long duration3 = ((AbstractExecutable) job.getTasks().get(1)).getDuration();
        long waitTime = job.getWaitTime();
        long waitTime2 = ((AbstractExecutable) job.getTasks().get(0)).getWaitTime();
        Assert.assertTrue(executableDurationContext2.getRecord().getDuration() < duration);
        ExecutableState state = executableDurationContext.getStepRecords().get(0).getState();
        if (state == ExecutableState.READY) {
            Assert.assertEquals((float) (executableDurationContext2.getStepRecords().get(0).getDuration() + 5000), (float) duration2, 1000.0f);
        } else if (state == ExecutableState.SUCCEED) {
            Assert.assertEquals(executableDurationContext2.getStepRecords().get(0).getDuration(), duration2);
        }
        Assert.assertTrue(0 < duration3);
        Assert.assertTrue(executableDurationContext2.getRecord().getWaitTime() <= waitTime);
        Assert.assertTrue(executableDurationContext2.getStepRecords().get(0).getWaitTime() <= waitTime2);
        assertTimeRunning(createTime, defaultExecutableOnModel.getId());
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        assertTimeSucceed(createTime, defaultExecutableOnModel.getId());
        Assert.assertEquals(1L, this.killProcessCount.get());
    }

    @Test
    public void testJobRestart() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        fiveSecondSucceedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime > 0);
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) executableManager.getJob(defaultExecutableOnModel.getId()).getTasks().get(0)).getStatus() == ExecutableState.RUNNING);
        });
        DefaultExecutable job = executableManager.getJob(defaultExecutableOnModel.getId());
        Assert.assertEquals(ExecutableState.RUNNING, job.getStatus());
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) job.getTasks().get(0)).getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) job.getTasks().get(1)).getStatus());
        long duration = job.getDuration();
        long duration2 = ((AbstractExecutable) job.getTasks().get(0)).getDuration();
        long duration3 = ((AbstractExecutable) job.getTasks().get(1)).getDuration();
        long waitTime = job.getWaitTime();
        Assert.assertTrue(duration > 0);
        Assert.assertTrue(duration2 > 0);
        Assert.assertEquals(0L, duration3);
        Assert.assertTrue(waitTime >= 0);
        restartJobWithLock(defaultExecutableOnModel.getId());
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        DefaultExecutable job2 = executableManager.getJob(defaultExecutableOnModel.getId());
        long createTime2 = job2.getCreateTime();
        Assert.assertTrue(createTime2 > createTime);
        assertTimeLegal(defaultExecutableOnModel.getId());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() == ExecutableState.RUNNING);
        });
        assertTimeRunning(createTime2, defaultExecutableOnModel.getId());
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertTimeSucceed(createTime2, defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertFalse(job2.isResumable());
    }

    @Test
    public void testJobPauseAndRestart() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        fiveSecondSucceedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable2 = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable2.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        fiveSecondSucceedTestExecutable2.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondSucceedTestExecutable2.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondSucceedTestExecutable2);
        executableManager.addJob(defaultExecutableOnModel);
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime > 0);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) executableManager.getJob(defaultExecutableOnModel.getId()).getTasks().get(0)).getStatus() == ExecutableState.RUNNING);
        });
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        pauseJobWithLock(defaultExecutableOnModel.getId());
        try {
            Thread.sleep(7000L);
        } catch (InterruptedException e2) {
            e2.printStackTrace();
        }
        ExecutableDurationContext executableDurationContext = new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId());
        assertPausedState(executableDurationContext, 3000L);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e3) {
            e3.printStackTrace();
        }
        assertPausedPending(executableDurationContext, new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId()), 1000L);
        assertMemoryRestore(currentAvailableMem);
        restartJobWithLock(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e4) {
            e4.printStackTrace();
        }
        long createTime2 = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime2 > createTime);
        assertTimeLegal(defaultExecutableOnModel.getId());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        getConditionFactory().until(() -> {
            if (executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() != ExecutableState.SUCCEED) {
                return Boolean.valueOf(executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() == ExecutableState.RUNNING);
            }
            atomicBoolean.set(true);
            return true;
        });
        if (!atomicBoolean.get()) {
            assertTimeRunning(createTime2, defaultExecutableOnModel.getId());
        }
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertTimeSucceed(createTime2, defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(1L, this.killProcessCount.get());
    }

    @Test
    @Repeat(3)
    public void testConcurrentJobLimit() {
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance("heterogeneous_segment");
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), "heterogeneous_segment");
        NExecutableManager nExecutableManager2 = (NExecutableManager) Mockito.spy(nExecutableManager);
        nExecutableManager2.deleteAllJob();
        ((NExecutableManager) Mockito.doAnswer(invocationOnMock -> {
            nExecutableManager.destroyProcess((String) invocationOnMock.getArgument(0));
            return null;
        }).when(nExecutableManager2)).destroyProcess(Mockito.anyString());
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        instanceFromEnv.setProperty("kylin.job.max-concurrent-jobs", "1");
        nDefaultScheduler.init(new JobEngineConfig(instanceFromEnv));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        int availablePermits = NDefaultScheduler.getMemoryRemaining().availablePermits();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), "heterogeneous_segment").getDataflow("747f864b-9721-4b97-acde-0aa8e8656cba");
        DefaultExecutable generateJob = generateJob(dataflow, "heterogeneous_segment");
        DefaultExecutable generatePartial = generatePartial(dataflow, "heterogeneous_segment");
        nExecutableManager2.addJob(generateJob);
        nExecutableManager2.addJob(generatePartial);
        waitForJobByStatus(generateJob.getId(), 60000, ExecutableState.RUNNING, nExecutableManager2);
        instanceFromEnv.setProperty("kylin.job.max-concurrent-jobs", "0");
        Assert.assertNotEquals(availablePermits, NDefaultScheduler.getMemoryRemaining().availablePermits());
        List runningExecutables = nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba");
        runningExecutables.sort(Comparator.comparing((v0) -> {
            return v0.getCreateTime();
        }));
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) runningExecutables.get(0)).getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) runningExecutables.get(1)).getStatus());
        instanceFromEnv.setProperty("kylin.job.max-concurrent-jobs", "1");
        waitForJobByStatus(generateJob.getId(), 60000, null, nExecutableManager2);
        waitForJobByStatus(generatePartial.getId(), 60000, null, nExecutableManager2);
        Assert.assertEquals(ExecutableState.SUCCEED, nExecutableManager2.getOutput(generateJob.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, nExecutableManager2.getOutput(generatePartial.getId()).getState());
        nDefaultScheduler.shutdown();
        Assert.assertEquals(availablePermits, NDefaultScheduler.getMemoryRemaining().availablePermits());
    }

    @Test
    @Repeat(3)
    public void testConcurrentJobWithPriority() {
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance("heterogeneous_segment");
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), "heterogeneous_segment");
        NExecutableManager nExecutableManager2 = (NExecutableManager) Mockito.spy(nExecutableManager);
        ((NExecutableManager) Mockito.doAnswer(invocationOnMock -> {
            nExecutableManager.destroyProcess((String) invocationOnMock.getArgument(0));
            return null;
        }).when(nExecutableManager2)).destroyProcess(Mockito.anyString());
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        instanceFromEnv.setProperty("kylin.job.max-concurrent-jobs", "1");
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), "heterogeneous_segment").getDataflow("747f864b-9721-4b97-acde-0aa8e8656cba");
        DefaultExecutable generateJob = generateJob(dataflow, "heterogeneous_segment", 4);
        DefaultExecutable generatePartial = generatePartial(dataflow, "heterogeneous_segment", 3);
        DefaultExecutable generatePartial2 = generatePartial(dataflow, "heterogeneous_segment", 2);
        DefaultExecutable generateJob2 = generateJob(dataflow, "heterogeneous_segment", 1);
        DefaultExecutable generatePartial3 = generatePartial(dataflow, "heterogeneous_segment", 0);
        DefaultExecutable generateJob3 = generateJob(dataflow, "heterogeneous_segment", 3);
        nExecutableManager2.addJob(generateJob);
        nExecutableManager2.addJob(generatePartial);
        nExecutableManager2.addJob(generatePartial2);
        nExecutableManager2.addJob(generateJob2);
        nExecutableManager2.addJob(generatePartial3);
        nExecutableManager2.addJob(generateJob3);
        nDefaultScheduler.init(new JobEngineConfig(instanceFromEnv));
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        waitForJobByStatus(generatePartial3.getId(), 60000, ExecutableState.SUCCEED, nExecutableManager2);
        Assert.assertEquals(5L, nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba").size());
        waitForJobByStatus(generatePartial2.getId(), 60000, ExecutableState.SUCCEED, nExecutableManager2);
        DefaultExecutable generateJob4 = generateJob(dataflow, "heterogeneous_segment", 0);
        nExecutableManager2.addJob(generateJob4);
        Assert.assertEquals(4L, nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba").size());
        waitForJobByStatus(generateJob4.getId(), 60000, ExecutableState.SUCCEED, nExecutableManager2);
        Assert.assertEquals(3L, nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba").size());
        waitForJobByStatus(generateJob.getId(), 60000, ExecutableState.SUCCEED, nExecutableManager2);
        Assert.assertEquals(0L, nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba").size());
        nDefaultScheduler.shutdown();
    }

    private DefaultExecutable generateJob(NDataflow nDataflow, String str, int i) {
        DefaultExecutable generateJob = generateJob(nDataflow, str);
        generateJob.setPriority(i);
        return generateJob;
    }

    private DefaultExecutable generateJob(NDataflow nDataflow, String str) {
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(str);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(nDataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) nDataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(nDataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) nDataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        return defaultExecutableOnModel;
    }

    private DefaultExecutable generatePartial(NDataflow nDataflow, String str, int i) {
        DefaultExecutable generatePartial = generatePartial(nDataflow, str);
        generatePartial.setPriority(i);
        return generatePartial;
    }

    private DefaultExecutable generatePartial(NDataflow nDataflow, String str) {
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(str);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        String uuid = nDataflow.getModel().getUuid();
        defaultExecutableOnModel.setId(defaultExecutableOnModel.getId() + "-" + uuid);
        defaultExecutableOnModel.setTargetSubject(uuid);
        defaultExecutableOnModel.setTargetSegments((List) nDataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setTargetSubject(nDataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) nDataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        return defaultExecutableOnModel;
    }

    private void assertPausedState(ExecutableDurationContext executableDurationContext, long j) {
        Assert.assertEquals(ExecutableState.PAUSED, executableDurationContext.getRecord().getState());
        ExecutableState state = executableDurationContext.getStepRecords().get(0).getState();
        Assert.assertTrue(state == ExecutableState.SUCCEED || state == ExecutableState.PAUSED);
        Assert.assertEquals(ExecutableState.READY, executableDurationContext.getStepRecords().get(1).getState());
        Assert.assertTrue(executableDurationContext.getRecord().getDuration() > 0);
        Assert.assertTrue(executableDurationContext.getStepRecords().get(0).getDuration() > 0);
        Assert.assertEquals(0L, executableDurationContext.getStepRecords().get(1).getDuration());
        Assert.assertTrue(executableDurationContext.getRecord().getWaitTime() >= 0);
        if (state == ExecutableState.SUCCEED) {
            Assert.assertEquals(0L, executableDurationContext.getStepRecords().get(0).getWaitTime());
        }
        Assert.assertEquals(0L, executableDurationContext.getStepRecords().get(1).getWaitTime());
    }

    private void assertPausedPending(ExecutableDurationContext executableDurationContext, ExecutableDurationContext executableDurationContext2, long j) {
        assertContextStateEquals(executableDurationContext, executableDurationContext2);
        ExecutableState state = executableDurationContext2.getStepRecords().get(0).getState();
        Assert.assertEquals(executableDurationContext.getRecord().getDuration(), executableDurationContext2.getRecord().getDuration());
        Assert.assertEquals(executableDurationContext.getStepRecords().get(0).getDuration(), executableDurationContext2.getStepRecords().get(0).getDuration());
        Assert.assertEquals(0L, executableDurationContext2.getStepRecords().get(1).getDuration());
        Assert.assertEquals((float) executableDurationContext.getRecord().getWaitTime(), (float) executableDurationContext2.getRecord().getWaitTime(), 100.0f);
        if (state == ExecutableState.READY) {
            Assert.assertEquals((float) (executableDurationContext.getStepRecords().get(0).getWaitTime() + j), (float) executableDurationContext2.getStepRecords().get(0).getWaitTime(), 100.0f);
        } else if (state == ExecutableState.SUCCEED) {
            Assert.assertEquals(0L, executableDurationContext2.getStepRecords().get(0).getWaitTime());
        }
        Assert.assertEquals(0L, executableDurationContext2.getStepRecords().get(1).getWaitTime());
    }

    private void assertContextStateEquals(ExecutableDurationContext executableDurationContext, ExecutableDurationContext executableDurationContext2) {
        Assert.assertEquals(executableDurationContext.getRecord().getState(), executableDurationContext2.getRecord().getState());
        Assert.assertEquals(executableDurationContext.getStepRecords().size(), executableDurationContext2.getStepRecords().size());
        for (int i = 0; i < executableDurationContext.getStepRecords().size(); i++) {
            Assert.assertEquals(executableDurationContext.getStepRecords().get(i).getState(), executableDurationContext2.getStepRecords().get(i).getState());
        }
    }

    private void assertErrorState(ExecutableDurationContext executableDurationContext) {
        Assert.assertEquals(ExecutableState.ERROR, executableDurationContext.getRecord().getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableDurationContext.getStepRecords().get(0).getState());
        Assert.assertEquals(ExecutableState.ERROR, executableDurationContext.getStepRecords().get(1).getState());
        Assert.assertTrue(executableDurationContext.getRecord().getDuration() > 0);
        Assert.assertTrue(executableDurationContext.getStepRecords().get(0).getDuration() > 0);
        Assert.assertTrue(executableDurationContext.getStepRecords().get(1).getDuration() > 0);
        Assert.assertTrue(executableDurationContext.getRecord().getWaitTime() >= 0);
    }

    private void assertErrorPending(ExecutableDurationContext executableDurationContext, ExecutableDurationContext executableDurationContext2) {
        assertContextStateEquals(executableDurationContext, executableDurationContext2);
        Assert.assertEquals(executableDurationContext.getRecord().getDuration(), executableDurationContext2.getRecord().getDuration());
        Assert.assertEquals(executableDurationContext.getStepRecords().get(0).getDuration(), executableDurationContext2.getStepRecords().get(0).getDuration());
        Assert.assertTrue(executableDurationContext2.getStepRecords().get(1).getDuration() > 0);
        Assert.assertEquals((float) executableDurationContext.getRecord().getWaitTime(), (float) executableDurationContext2.getRecord().getWaitTime(), 100.0f);
        Assert.assertEquals((float) executableDurationContext.getStepRecords().get(0).getWaitTime(), (float) executableDurationContext2.getStepRecords().get(0).getWaitTime(), 100.0f);
        Assert.assertEquals(executableDurationContext2.getStepRecords().get(1).getWaitTime(), executableDurationContext2.getStepRecords().get(1).getWaitTime());
    }

    private void restartJobWithLock(String str) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            getManager().restartJob(str);
            return null;
        }, this.project, 1, -1L, str);
    }

    private void resumeJobWithLock(String str) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            getManager().resumeJob(str);
            return null;
        }, this.project, 1, -1L, str);
    }

    private void pauseJobWithLock(String str) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            getManager().pauseJob(str);
            return null;
        }, this.project, 1, -1L, str);
    }

    private void discardJobWithLock(String str) {
        EnhancedUnitOfWork.doInTransactionWithCheckAndRetry(() -> {
            getManager().discardJob(str);
            return null;
        }, this.project, 1, -1L, str);
    }

    private NExecutableManager getManager() {
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), this.project);
        NExecutableManager nExecutableManager2 = (NExecutableManager) Mockito.spy(nExecutableManager);
        ((NExecutableManager) Mockito.doAnswer(invocationOnMock -> {
            nExecutableManager.destroyProcess((String) invocationOnMock.getArgument(0));
            this.killProcessCount.incrementAndGet();
            return null;
        }).when(nExecutableManager2)).destroyProcess(Mockito.anyString());
        return nExecutableManager2;
    }

    @Test
    @Repeat(3)
    public void testJobErrorAndResume() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        errorTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        errorTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        errorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(errorTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime > 0);
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() == ExecutableState.ERROR);
        });
        ExecutableDurationContext executableDurationContext = new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId());
        assertErrorState(executableDurationContext);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        ExecutableDurationContext executableDurationContext2 = new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId());
        assertErrorPending(executableDurationContext, executableDurationContext2);
        assertMemoryRestore(currentAvailableMem);
        resumeJobWithLock(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(((AbstractExecutable) executableManager.getJob(defaultExecutableOnModel.getId()).getTasks().get(1)).getStatus() == ExecutableState.RUNNING);
        });
        DefaultExecutable job = executableManager.getJob(defaultExecutableOnModel.getId());
        long duration = job.getDuration();
        ((AbstractExecutable) job.getTasks().get(0)).getDuration();
        long duration2 = ((AbstractExecutable) job.getTasks().get(1)).getDuration();
        long waitTime = job.getWaitTime();
        long waitTime2 = ((AbstractExecutable) job.getTasks().get(0)).getWaitTime();
        ((AbstractExecutable) job.getTasks().get(1)).getWaitTime();
        Assert.assertTrue(executableDurationContext2.getRecord().getDuration() < duration);
        Assert.assertEquals(executableDurationContext2.getStepRecords().get(0).getWaitTime(), waitTime2);
        Assert.assertTrue(executableDurationContext2.getStepRecords().get(1).getDuration() < duration2);
        Assert.assertTrue(executableDurationContext2.getRecord().getWaitTime() <= waitTime);
        assertTimeRunning(createTime, defaultExecutableOnModel.getId());
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertTimeError(createTime, defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    @Test
    @Repeat(3)
    public void testJobErrorAndRestart() {
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
        succeedTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        succeedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        succeedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(succeedTestExecutable);
        FiveSecondErrorTestExecutable fiveSecondErrorTestExecutable = new FiveSecondErrorTestExecutable();
        fiveSecondErrorTestExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        fiveSecondErrorTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondErrorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondErrorTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        long createTime = executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime();
        Assert.assertTrue(createTime > 0);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() == ExecutableState.ERROR);
        });
        ExecutableDurationContext executableDurationContext = new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId());
        assertErrorState(executableDurationContext);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        assertErrorPending(executableDurationContext, new ExecutableDurationContext(this.project, defaultExecutableOnModel.getId()));
        assertMemoryRestore(currentAvailableMem);
        restartJobWithLock(defaultExecutableOnModel.getId());
        getConditionFactory().until(() -> {
            return Boolean.valueOf(executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() == ExecutableState.READY && executableManager.getJob(defaultExecutableOnModel.getId()).getCreateTime() > createTime);
        });
        DefaultExecutable job = executableManager.getJob(defaultExecutableOnModel.getId());
        long createTime2 = job.getCreateTime();
        assertTimeLegal(defaultExecutableOnModel.getId());
        Assert.assertEquals(ExecutableState.READY, job.getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) job.getTasks().get(0)).getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) job.getTasks().get(1)).getStatus());
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        getConditionFactory().until(() -> {
            if (executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() != ExecutableState.ERROR) {
                return Boolean.valueOf(executableManager.getJob(defaultExecutableOnModel.getId()).getStatus() == ExecutableState.RUNNING);
            }
            atomicBoolean.set(true);
            return true;
        });
        if (!atomicBoolean.get()) {
            assertTimeRunning(createTime2, defaultExecutableOnModel.getId());
        }
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertTimeError(createTime2, defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
    }

    @Test
    @Repeat
    public void testRetryableException() {
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutable defaultExecutable = new DefaultExecutable();
        defaultExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutable.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutable.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        ErrorTestExecutable errorTestExecutable = new ErrorTestExecutable();
        errorTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        errorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutable.addTask(errorTestExecutable);
        overwriteSystemProp("kylin.job.retry", "3");
        Assert.assertFalse(defaultExecutable.needRetry(1, new Exception("")));
        Assert.assertTrue(errorTestExecutable.needRetry(1, new Exception("")));
        Assert.assertFalse(errorTestExecutable.needRetry(1, null));
        Assert.assertFalse(errorTestExecutable.needRetry(4, new Exception("")));
        overwriteSystemProp("kylin.job.retry-exception-classes", "java.io.FileNotFoundException");
        Assert.assertTrue(errorTestExecutable.needRetry(1, new FileNotFoundException()));
        Assert.assertFalse(errorTestExecutable.needRetry(1, new Exception("")));
    }

    @Test
    public void testJobRunningTimeout() {
        overwriteSystemProp("kylin.scheduler.schedule-job-timeout-minute", "1");
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutable defaultExecutable = new DefaultExecutable();
        defaultExecutable.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutable.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutable.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        LongRunningTestExecutable longRunningTestExecutable = new LongRunningTestExecutable();
        longRunningTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        longRunningTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutable.addTask(longRunningTestExecutable);
        executableManager.addJob(defaultExecutable);
        waitForJobFinish(defaultExecutable.getId(), 200000);
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(defaultExecutable.getId()).getState());
    }

    @Test
    public void testSubmitParallelTasksSucceed() {
        logger.info("testSubmitParallelTasksSuccessed");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setJobType(JobTypeEnum.INDEX_BUILD);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable();
        fiveSecondSucceedTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondSucceedTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.SUCCEED, executableManager.getOutput(fiveSecondSucceedTestExecutable.getId()).getState());
    }

    @Test
    public void testSubmitParallelTasksError() throws InterruptedException {
        logger.info("testSubmitParallelTasksError");
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setJobType(JobTypeEnum.INDEX_BUILD);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondErrorTestExecutable fiveSecondErrorTestExecutable = new FiveSecondErrorTestExecutable();
        fiveSecondErrorTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondErrorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondErrorTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        assertMemoryRestore(currentAvailableMem - defaultExecutableOnModel.computeStepDriverMemory());
        waitForJobFinish(defaultExecutableOnModel.getId());
        assertMemoryRestore(currentAvailableMem);
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(defaultExecutableOnModel.getId()).getState());
        Assert.assertEquals(ExecutableState.ERROR, executableManager.getOutput(fiveSecondErrorTestExecutable.getId()).getState());
    }

    @Test
    public void testSubmitParallelTasksReachMemoryQuota() throws Exception {
        logger.info("testSubmitParallelTasksByMemoryQuota");
        ((ConcurrentHashMap) getInstanceByProject().get(NExecutableManager.class)).put(this.project, (NExecutableManager) Mockito.spy(NExecutableManager.getInstance(getTestConfig(), this.project)));
        double currentAvailableMem = NDefaultScheduler.currentAvailableMem();
        ArrayList newArrayList = Lists.newArrayList(NDataflowManager.getInstance(getTestConfig(), this.project).listAllDataflows());
        long max = Math.max(Math.round(currentAvailableMem / newArrayList.size()), 1024L) * 2;
        getTestConfig().setProperty("kylin.engine.driver-memory-base", Long.valueOf(max).toString());
        getTestConfig().setProperty("kylin.engine.driver-memory-maximum", "102400");
        addParallelTasksForJob(newArrayList, executableManager);
        getConditionFactory().until(() -> {
            return Boolean.valueOf(NDefaultScheduler.currentAvailableMem() <= ((double) max));
        });
        assertMemoryRestore(currentAvailableMem);
    }

    @Test
    public void testMarkJobError_AfterUpdateJobStateFailed() {
        changeSchedulerInterval(1);
        DefaultExecutableOnModel defaultExecutableOnModel = new DefaultExecutableOnModel();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), this.project).getDataflow("89af4ee2-2cdb-4b07-b39e-4c29856309aa");
        defaultExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
        defaultExecutableOnModel.setJobType(JobTypeEnum.INDEX_BUILD);
        defaultExecutableOnModel.setParam("layoutIds", "1,2,3,4,5");
        defaultExecutableOnModel.setTargetSubject(dataflow.getModel().getUuid());
        defaultExecutableOnModel.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        FiveSecondErrorTestExecutable fiveSecondErrorTestExecutable = new FiveSecondErrorTestExecutable();
        fiveSecondErrorTestExecutable.setTargetSubject(dataflow.getModel().getUuid());
        fiveSecondErrorTestExecutable.setTargetSegments((List) dataflow.getSegments().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toList()));
        defaultExecutableOnModel.addTask(fiveSecondErrorTestExecutable);
        executableManager.addJob(defaultExecutableOnModel);
        executableManager.updateJobOutput(defaultExecutableOnModel.getId(), ExecutableState.RUNNING);
        getConditionFactory().untilAsserted(() -> {
            DefaultExecutable job = executableManager.getJob(defaultExecutableOnModel.getId());
            Assert.assertEquals(ExecutableState.ERROR, job.getStatus());
            Assert.assertEquals(ExecutableState.ERROR, ((AbstractExecutable) job.getTasks().get(0)).getStatus());
        });
    }

    private void addParallelTasksForJob(List<NDataflow> list, NExecutableManager nExecutableManager) {
        for (NDataflow nDataflow : list) {
            NoErrorStatusExecutableOnModel noErrorStatusExecutableOnModel = new NoErrorStatusExecutableOnModel();
            noErrorStatusExecutableOnModel.setProject(AbstractJobHandlerTest.DEFAULT_PROJECT);
            noErrorStatusExecutableOnModel.setJobType(JobTypeEnum.INDEX_BUILD);
            noErrorStatusExecutableOnModel.setParam("layoutIds", "1,2");
            noErrorStatusExecutableOnModel.setTargetSubject(nDataflow.getModel().getUuid());
            noErrorStatusExecutableOnModel.setTargetSegments((List) nDataflow.getSegments().stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toList()));
            FiveSecondSucceedTestExecutable fiveSecondSucceedTestExecutable = new FiveSecondSucceedTestExecutable(10);
            fiveSecondSucceedTestExecutable.setTargetSubject(nDataflow.getModel().getUuid());
            fiveSecondSucceedTestExecutable.setTargetSegments((List) nDataflow.getSegments().stream().map((v0) -> {
                return v0.getId();
            }).collect(Collectors.toList()));
            noErrorStatusExecutableOnModel.addTask(fiveSecondSucceedTestExecutable);
            nExecutableManager.addJob(noErrorStatusExecutableOnModel);
        }
    }

    @Test
    public void testSchedulerShutdown() throws Exception {
        overwriteSystemProp("kylin.env", "dev");
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance(this.project);
        Assert.assertTrue(nDefaultScheduler.hasStarted());
        Thread.sleep(2000L);
        EpochManager epochManager = EpochManager.getInstance();
        epochManager.tryUpdateEpoch("_global", false);
        epochManager.updateEpochWithNotifier(this.project, true);
        nDefaultScheduler.fetchJobsImmediately();
        getConditionFactory().untilAsserted(() -> {
            Assert.assertFalse(nDefaultScheduler.hasStarted());
        });
    }

    @Test
    @Ignore("TODO: move it")
    public void testStorageQuotaLimitReached() {
        try {
            this.scheduler.getContext().setReachQuotaLimit(true);
            overwriteSystemProp("kylin.storage.quota-in-giga-bytes", "0");
            DefaultExecutable defaultExecutable = new DefaultExecutable();
            defaultExecutable.setProject(this.project);
            SucceedTestExecutable succeedTestExecutable = new SucceedTestExecutable();
            succeedTestExecutable.setProject(this.project);
            defaultExecutable.addTask(succeedTestExecutable);
            executableManager.addJob(defaultExecutable);
            waitForJobFinish(defaultExecutable.getId());
            Assert.assertEquals(ExecutableState.PAUSED, executableManager.getJob(defaultExecutable.getId()).getStatus());
            this.scheduler.getContext().setReachQuotaLimit(true);
            overwriteSystemProp("kylin.storage.quota-in-giga-bytes", Integer.toString(Integer.MAX_VALUE));
            DefaultExecutable defaultExecutable2 = new DefaultExecutable();
            defaultExecutable2.setProject(this.project);
            LongRunningTestExecutable longRunningTestExecutable = new LongRunningTestExecutable();
            longRunningTestExecutable.setProject(this.project);
            defaultExecutable2.addTask(longRunningTestExecutable);
            executableManager.addJob(defaultExecutable2);
            waitForJobByStatus(defaultExecutable2.getId(), 60000, ExecutableState.RUNNING, executableManager);
            overwriteSystemProp("kylin.storage.quota-in-giga-bytes", "0");
            waitForJobFinish(defaultExecutable2.getId());
            Assert.assertEquals(ExecutableState.PAUSED, executableManager.getJob(defaultExecutable2.getId()).getStatus());
        } finally {
            overwriteSystemProp("kylin.storage.quota-in-giga-bytes", Integer.toString(Integer.MAX_VALUE));
            this.scheduler.getContext().setReachQuotaLimit(false);
        }
    }

    @Test
    public void testDiscardPendingJobDuration() {
        try {
            logger.info("testDiscardPendingJobDuration");
            overwriteSystemProp("kylin.job.max-concurrent-jobs", "1");
            overwriteSystemProp("kylin.storage.quota-in-giga-bytes", Integer.toString(Integer.MAX_VALUE));
            DefaultExecutable defaultExecutable = new DefaultExecutable();
            defaultExecutable.setProject(this.project);
            LongRunningTestExecutable longRunningTestExecutable = new LongRunningTestExecutable();
            longRunningTestExecutable.setProject(this.project);
            defaultExecutable.addTask(longRunningTestExecutable);
            executableManager.addJob(defaultExecutable);
            waitForJobByStatus(defaultExecutable.getId(), 60000, ExecutableState.RUNNING, executableManager);
            DefaultExecutable defaultExecutable2 = new DefaultExecutable();
            defaultExecutable2.setProject(this.project);
            LongRunningTestExecutable longRunningTestExecutable2 = new LongRunningTestExecutable();
            longRunningTestExecutable2.setProject(this.project);
            defaultExecutable2.addTask(longRunningTestExecutable2);
            executableManager.addJob(defaultExecutable2);
            executableManager.discardJob(defaultExecutable2.getId());
            long currentTimeMillis = System.currentTimeMillis();
            try {
                Thread.sleep(5000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            Assert.assertTrue(defaultExecutable2.getWaitTime() < System.currentTimeMillis() - currentTimeMillis);
            overwriteSystemProp("kylin.storage.quota-in-giga-bytes", Integer.toString(Integer.MAX_VALUE));
            this.scheduler.getContext().setReachQuotaLimit(false);
        } catch (Throwable th) {
            overwriteSystemProp("kylin.storage.quota-in-giga-bytes", Integer.toString(Integer.MAX_VALUE));
            this.scheduler.getContext().setReachQuotaLimit(false);
            throw th;
        }
    }

    @Test
    @Repeat(3)
    public void testProjectConcurrentJobLimit() {
        KylinConfig instanceFromEnv = KylinConfig.getInstanceFromEnv();
        instanceFromEnv.setProperty("kylin.job.max-concurrent-jobs", "1");
        instanceFromEnv.setProperty("kylin.engine.driver-memory-base", "512");
        NDefaultScheduler nDefaultScheduler = NDefaultScheduler.getInstance("heterogeneous_segment");
        NExecutableManager nExecutableManager = NExecutableManager.getInstance(KylinConfig.getInstanceFromEnv(), "heterogeneous_segment");
        NExecutableManager nExecutableManager2 = (NExecutableManager) Mockito.spy(nExecutableManager);
        nExecutableManager2.deleteAllJob();
        ((NExecutableManager) Mockito.doAnswer(invocationOnMock -> {
            nExecutableManager.destroyProcess((String) invocationOnMock.getArgument(0));
            return null;
        }).when(nExecutableManager2)).destroyProcess(Mockito.anyString());
        nDefaultScheduler.init(new JobEngineConfig(instanceFromEnv));
        NProjectManager nProjectManager = NProjectManager.getInstance(instanceFromEnv);
        if (!nDefaultScheduler.hasStarted()) {
            throw new RuntimeException("scheduler has not been started");
        }
        int availablePermits = NDefaultScheduler.getMemoryRemaining().availablePermits();
        NDataflow dataflow = NDataflowManager.getInstance(getTestConfig(), "heterogeneous_segment").getDataflow("747f864b-9721-4b97-acde-0aa8e8656cba");
        DefaultExecutable generateJob = generateJob(dataflow, "heterogeneous_segment");
        DefaultExecutable generatePartial = generatePartial(dataflow, "heterogeneous_segment");
        nExecutableManager2.addJob(generateJob);
        nExecutableManager2.addJob(generatePartial);
        waitForJobByStatus(generateJob.getId(), 60000, ExecutableState.RUNNING, nExecutableManager2);
        Assert.assertNotEquals(availablePermits, NDefaultScheduler.getMemoryRemaining().availablePermits());
        List runningExecutables = nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba");
        runningExecutables.sort(Comparator.comparing((v0) -> {
            return v0.getCreateTime();
        }));
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) runningExecutables.get(0)).getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) runningExecutables.get(1)).getStatus());
        nProjectManager.getProject("heterogeneous_segment").getConfig().setProperty("kylin.job.max-concurrent-jobs", "2");
        Assert.assertNotEquals(availablePermits, NDefaultScheduler.getMemoryRemaining().availablePermits());
        nExecutableManager2.addJob(generateJob(dataflow, "heterogeneous_segment"));
        waitForJobByStatus(generateJob.getId(), 60000, ExecutableState.RUNNING, nExecutableManager2);
        waitForJobByStatus(generatePartial.getId(), 60000, ExecutableState.RUNNING, nExecutableManager2);
        List runningExecutables2 = nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba");
        runningExecutables2.sort(Comparator.comparing((v0) -> {
            return v0.getCreateTime();
        }));
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) runningExecutables2.get(0)).getStatus());
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) runningExecutables2.get(1)).getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) runningExecutables2.get(2)).getStatus());
        nProjectManager.getProject("heterogeneous_segment").getConfig().setProperty("kylin.job.max-concurrent-jobs", "1");
        waitForJobByStatus(generateJob.getId(), 60000, ExecutableState.RUNNING, nExecutableManager2);
        waitForJobByStatus(generatePartial.getId(), 60000, ExecutableState.RUNNING, nExecutableManager2);
        runningExecutables2.sort(Comparator.comparing((v0) -> {
            return v0.getCreateTime();
        }));
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) runningExecutables2.get(0)).getStatus());
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) runningExecutables2.get(1)).getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) runningExecutables2.get(2)).getStatus());
        waitForJobByStatus(generateJob.getId(), 60000, null, nExecutableManager2);
        List runningExecutables3 = nExecutableManager2.getRunningExecutables("heterogeneous_segment", "747f864b-9721-4b97-acde-0aa8e8656cba");
        Assert.assertEquals(2L, runningExecutables3.size());
        runningExecutables3.sort(Comparator.comparing((v0) -> {
            return v0.getCreateTime();
        }));
        Assert.assertEquals(ExecutableState.RUNNING, ((AbstractExecutable) runningExecutables3.get(0)).getStatus());
        Assert.assertEquals(ExecutableState.READY, ((AbstractExecutable) runningExecutables3.get(1)).getStatus());
        nDefaultScheduler.shutdown();
        Assert.assertEquals(availablePermits, NDefaultScheduler.getMemoryRemaining().availablePermits());
        Assert.assertEquals(1L, nDefaultScheduler.getMaxConcurrentJobLimitByProject(instanceFromEnv, nDefaultScheduler.getJobEngineConfig(), "xxxxx"));
    }
}
