/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs.scheduler;

import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.streaming.jobs.scheduler.StreamingScheduler;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

public class StreamingSchedulerTest
extends StreamingTestCase {
    private static String PROJECT = "streaming_test";
    private static String modelId;
    private static String dataflowId;
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
        Map map = (Map)ReflectionUtils.getField(StreamingScheduler.class, "INSTANCE_MAP");
        map.clear();
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testInit() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        Assert.assertTrue((boolean)streamingScheduler.getInitialized().get());
        Assert.assertTrue((boolean)streamingScheduler.getHasStarted().get());
    }

    @Test
    public void testInitWithStreamingDisabled() {
        StreamingSchedulerTest.getTestConfig().setProperty("kylin.streaming.enabled", "false");
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        Object jobPool = ReflectionUtils.getField(streamingScheduler, "jobPool");
        Assert.assertNull((Object)jobPool);
        Assert.assertEquals((Object)true, (Object)streamingScheduler.getInitialized().get());
        Assert.assertEquals((Object)true, (Object)streamingScheduler.getHasStarted().get());
    }

    @Test
    public void testNoneJobNode() {
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        testConfig.setProperty("kylin.server.mode", "query");
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        Assert.assertEquals((Object)false, (Object)streamingScheduler.getInitialized().get());
        Assert.assertEquals((Object)false, (Object)streamingScheduler.getHasStarted().get());
    }

    @Test
    public void testProjectExists() {
        StreamingScheduler streamingScheduler = StreamingScheduler.getInstance((String)PROJECT);
        try {
            StreamingScheduler streamingScheduler2 = new StreamingScheduler(PROJECT);
        }
        catch (Exception e) {
            Assert.assertEquals((Object)true, (Object)(e instanceof IllegalStateException));
        }
        streamingScheduler.forceShutdown();
    }

    @Test
    public void testSubmitJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        String buildJobId = StreamingUtils.getJobId((String)modelId, (String)JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid(buildJobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)buildJobMeta.getCurrentStatus());
        String mergeJobId = StreamingUtils.getJobId((String)modelId, (String)JobTypeEnum.STREAMING_MERGE.toString());
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid(mergeJobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testSubmitBuildJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        String jobId = StreamingUtils.getJobId((String)modelId, (String)JobTypeEnum.STREAMING_BUILD.toString());
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)jobMeta.getCurrentStatus());
        jobMeta.setProcessId("9876");
        Assert.assertNotNull((Object)jobMeta.getProcessId());
        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        NDataflow df = dfMgr.getDataflow(dataflowId);
        Assert.assertEquals((long)1L, (long)df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
        Assert.assertTrue((boolean)((NDataSegment)df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).get(0)).getAdditionalInfo().isEmpty());
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
        mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        dfMgr = NDataflowManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        df = dfMgr.getDataflow(dataflowId);
        Assert.assertEquals((long)0L, (long)df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
    }

    @Test
    public void testSubmitMergeJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        String jobId = StreamingUtils.getJobId((String)modelId, (String)JobTypeEnum.STREAMING_MERGE.toString());
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)jobMeta.getCurrentStatus());
        NDataflowManager dfMgr = NDataflowManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        NDataflow df = dfMgr.getDataflow(dataflowId).copy();
        NDataSegment seg = (NDataSegment)df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).get(0);
        seg.getAdditionalInfo().put("file_layer", "1");
        NDataflowUpdate update = new NDataflowUpdate(df.getUuid());
        update.setToUpdateSegs(new NDataSegment[]{seg});
        dfMgr.updateDataflow(update);
        Assert.assertEquals((long)1L, (long)df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
        Assert.assertEquals((Object)"1", seg.getAdditionalInfo().get("file_layer"));
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        dfMgr = NDataflowManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        df = dfMgr.getDataflow(dataflowId);
        Assert.assertEquals((long)0L, (long)df.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
    }

    @Test
    public void testSubmitMergeJobException() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        String jobId = StreamingUtils.getJobId((String)modelId, (String)JobTypeEnum.STREAMING_MERGE.toString());
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)jobMeta.getCurrentStatus());
        testConfig.setProperty("kylin.streaming.enabled", "false");
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)jobMeta.getCurrentStatus());
        testConfig.setProperty("kylin.streaming.enabled", "true");
        mgr.updateStreamingJob(jobId, updater -> updater.setCurrentStatus(JobStatusEnum.LAUNCHING_ERROR));
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        this.thrown.expect(KylinException.class);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
    }

    @Test
    public void testStopJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        String buildJobId = StreamingUtils.getJobId((String)modelId, (String)JobTypeEnum.STREAMING_BUILD.toString());
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid(buildJobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)buildJobMeta.getCurrentStatus());
        String mergeJobId = StreamingUtils.getJobId((String)modelId, (String)JobTypeEnum.STREAMING_MERGE.toString());
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid(mergeJobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)mergeJobMeta.getCurrentStatus());
        streamingScheduler.stopJob(modelId, JobTypeEnum.STREAMING_BUILD);
        streamingScheduler.stopJob(modelId, JobTypeEnum.STREAMING_MERGE);
        buildJobMeta = mgr.getStreamingJobByUuid(buildJobId);
        mergeJobMeta = mgr.getStreamingJobByUuid(mergeJobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testStopBuildJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        JobTypeEnum jobType = JobTypeEnum.STREAMING_BUILD;
        String jobId = StreamingUtils.getJobId((String)modelId, (String)jobType.toString());
        streamingScheduler.submitJob(PROJECT, modelId, jobType);
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        streamingScheduler.stopJob(modelId, jobType);
        mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)jobMeta.getCurrentStatus());
    }

    @Test
    public void testStopMergeJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        JobTypeEnum jobType = JobTypeEnum.STREAMING_MERGE;
        String jobId = StreamingUtils.getJobId((String)modelId, (String)jobType.toString());
        streamingScheduler.submitJob(PROJECT, modelId, jobType);
        KylinConfig testConfig = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)testConfig, (String)PROJECT);
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        streamingScheduler.stopJob(modelId, jobType);
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)jobMeta.getCurrentStatus());
    }

    @Test
    public void testStopYarnJob() {
        StreamingScheduler streamingScheduler = (StreamingScheduler)Mockito.spy((Object)new StreamingScheduler(PROJECT));
        JobTypeEnum jobType = JobTypeEnum.STREAMING_MERGE;
        String jobId = StreamingUtils.getJobId((String)modelId, (String)jobType.toString());
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        Mockito.when((Object)streamingScheduler.applicationExisted(jobId)).thenReturn((Object)true);
        StreamingJobMeta jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)jobMeta.getCurrentStatus());
        streamingScheduler.stopJob(modelId, jobType);
        jobMeta = mgr.getStreamingJobByUuid(jobId);
        Assert.assertEquals((Object)JobStatusEnum.STOPPING, (Object)jobMeta.getCurrentStatus());
    }

    @Test
    public void testRetryJob() {
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", copyForWrite -> {
            copyForWrite.getParams().put("kylin.streaming.job-retry-enabled", "true");
            copyForWrite.setCurrentStatus(JobStatusEnum.ERROR);
        });
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> {
            copyForWrite.getParams().put("kylin.streaming.job-retry-enabled", "true");
            copyForWrite.setCurrentStatus(JobStatusEnum.ERROR);
        });
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        streamingScheduler.retryJob();
        Map retryMap = (Map)ReflectionUtils.getField(streamingScheduler, "retryMap");
        Assert.assertTrue((boolean)retryMap.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_build"));
        Assert.assertTrue((boolean)retryMap.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
        for (int i = 0; i < 5; ++i) {
            streamingScheduler.retryJob();
        }
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testResumeJobOfStartingStatus() {
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", copyForWrite -> {
            copyForWrite.setCurrentStatus(JobStatusEnum.STARTING);
            copyForWrite.setSkipListener(true);
        });
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.STARTING));
        StreamingScheduler instance = new StreamingScheduler(PROJECT);
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertFalse((boolean)buildJobMeta.isSkipListener());
        Assert.assertEquals((Object)JobStatusEnum.RUNNING, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testKillJobOfStoppingStatus() {
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.STOPPING));
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.STOPPING));
        StreamingScheduler instance = new StreamingScheduler(PROJECT);
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testKillStreamingJob() {
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingScheduler instance = new StreamingScheduler(PROJECT);
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        instance.killJob(modelId, JobTypeEnum.STREAMING_MERGE, JobStatusEnum.ERROR);
        instance.killJob(modelId, JobTypeEnum.STREAMING_BUILD, JobStatusEnum.ERROR);
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertEquals((Object)JobStatusEnum.ERROR, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testKillYarnApplication() {
        StreamingScheduler streamingScheduler = (StreamingScheduler)Mockito.spy((Object)new StreamingScheduler(PROJECT));
        JobTypeEnum jobType = JobTypeEnum.STREAMING_MERGE;
        String jobId = StreamingUtils.getJobId((String)modelId, (String)jobType.toString());
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob(jobId, copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        Mockito.when((Object)streamingScheduler.applicationExisted(jobId)).thenReturn((Object)true);
        this.thrown.expect(KylinException.class);
        streamingScheduler.killYarnApplication(jobId, modelId);
    }

    @Test
    public void testForceStopStreamingJob() {
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        String mergeJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_merge";
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingScheduler instance = new StreamingScheduler(PROJECT);
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        mgr.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", copyForWrite -> copyForWrite.setCurrentStatus(JobStatusEnum.RUNNING));
        instance.killJob(modelId, JobTypeEnum.STREAMING_MERGE, JobStatusEnum.STOPPED);
        instance.killJob(modelId, JobTypeEnum.STREAMING_BUILD, JobStatusEnum.STOPPED);
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta mergeJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)buildJobMeta.getCurrentStatus());
        Assert.assertEquals((Object)JobStatusEnum.STOPPED, (Object)mergeJobMeta.getCurrentStatus());
    }

    @Test
    public void testSkipJobListener() {
        String buildJobId = "e78a89dd-847f-4574-8afa-8768b4228b72_build";
        KylinConfig config = StreamingSchedulerTest.getTestConfig();
        StreamingScheduler instance = new StreamingScheduler(PROJECT);
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        StreamingJobMeta buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        Assert.assertFalse((boolean)buildJobMeta.isSkipListener());
        instance.skipJobListener(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72_build", true);
        buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        Assert.assertTrue((boolean)buildJobMeta.isSkipListener());
        instance.skipJobListener(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72_build", false);
        buildJobMeta = mgr.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        Assert.assertFalse((boolean)buildJobMeta.isSkipListener());
    }

    @Test
    public void testShutdownByProject() {
        StreamingScheduler.getInstance((String)PROJECT);
        StreamingScheduler.shutdownByProject((String)PROJECT);
        Map map = (Map)ReflectionUtils.getField(StreamingScheduler.class, "INSTANCE_MAP");
        Assert.assertTrue((boolean)map.isEmpty());
    }

    static {
        dataflowId = modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
    }
}

