package org.apache.kylin.streaming.jobs.scheduler;

import java.util.Map;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.model.NDataSegment;
import org.apache.kylin.metadata.cube.model.NDataflow;
import org.apache.kylin.metadata.cube.model.NDataflowManager;
import org.apache.kylin.metadata.cube.model.NDataflowUpdate;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/scheduler/StreamingSchedulerTest.class */
public class StreamingSchedulerTest extends StreamingTestCase {
    private static String PROJECT = "streaming_test";
    private static String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
    private static String dataflowId = modelId;

    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
        ((Map) ReflectionUtils.getField((Class<?>) StreamingScheduler.class, "INSTANCE_MAP")).clear();
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testInit() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        Assert.assertTrue(streamingScheduler.getInitialized().get());
        Assert.assertTrue(streamingScheduler.getHasStarted().get());
    }

    @Test
    public void testInitWithStreamingDisabled() {
        getTestConfig().setProperty("kylin.streaming.enabled", "false");
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        Assert.assertNull(ReflectionUtils.getField(streamingScheduler, "jobPool"));
        Assert.assertEquals(true, Boolean.valueOf(streamingScheduler.getInitialized().get()));
        Assert.assertEquals(true, Boolean.valueOf(streamingScheduler.getHasStarted().get()));
    }

    @Test
    public void testNoneJobNode() {
        getTestConfig().setProperty("kylin.server.mode", "query");
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        Assert.assertEquals(false, Boolean.valueOf(streamingScheduler.getInitialized().get()));
        Assert.assertEquals(false, Boolean.valueOf(streamingScheduler.getHasStarted().get()));
    }

    @Test
    public void testProjectExists() {
        StreamingScheduler streamingScheduler = StreamingScheduler.getInstance(PROJECT);
        try {
            new StreamingScheduler(PROJECT);
        } catch (Exception e) {
            Assert.assertEquals(true, Boolean.valueOf(e instanceof IllegalStateException));
        }
        streamingScheduler.forceShutdown();
    }

    @Test
    public void testSubmitJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_BUILD.toString())).getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString())).getCurrentStatus());
    }

    @Test
    public void testSubmitBuildJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        String jobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_BUILD.toString());
        KylinConfig testConfig = getTestConfig();
        StreamingJobMeta streamingJobByUuid = StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId);
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid.getCurrentStatus());
        streamingJobByUuid.setProcessId("9876");
        Assert.assertNotNull(streamingJobByUuid.getProcessId());
        NDataflow dataflow = NDataflowManager.getInstance(testConfig, PROJECT).getDataflow(dataflowId);
        Assert.assertEquals(1L, dataflow.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
        Assert.assertTrue(((NDataSegment) dataflow.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).get(0)).getAdditionalInfo().isEmpty());
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
        Assert.assertEquals(JobStatusEnum.RUNNING, StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId).getCurrentStatus());
        Assert.assertEquals(0L, NDataflowManager.getInstance(testConfig, PROJECT).getDataflow(dataflowId).getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
    }

    @Test
    public void testSubmitMergeJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        String jobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString());
        KylinConfig testConfig = getTestConfig();
        Assert.assertEquals(JobStatusEnum.STOPPED, StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId).getCurrentStatus());
        NDataflowManager nDataflowManager = NDataflowManager.getInstance(testConfig, PROJECT);
        NDataflow copy = nDataflowManager.getDataflow(dataflowId).copy();
        NDataSegment nDataSegment = (NDataSegment) copy.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).get(0);
        nDataSegment.getAdditionalInfo().put("file_layer", "1");
        NDataflowUpdate nDataflowUpdate = new NDataflowUpdate(copy.getUuid());
        nDataflowUpdate.setToUpdateSegs(new NDataSegment[]{nDataSegment});
        nDataflowManager.updateDataflow(nDataflowUpdate);
        Assert.assertEquals(1L, copy.getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
        Assert.assertEquals("1", nDataSegment.getAdditionalInfo().get("file_layer"));
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        Assert.assertEquals(JobStatusEnum.RUNNING, StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId).getCurrentStatus());
        Assert.assertEquals(0L, NDataflowManager.getInstance(testConfig, PROJECT).getDataflow(dataflowId).getSegments(new SegmentStatusEnum[]{SegmentStatusEnum.NEW}).size());
    }

    @Test
    public void testSubmitMergeJobException() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        String jobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString());
        KylinConfig testConfig = getTestConfig();
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(testConfig, PROJECT);
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid(jobId);
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid.getCurrentStatus());
        testConfig.setProperty("kylin.streaming.enabled", "false");
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid.getCurrentStatus());
        testConfig.setProperty("kylin.streaming.enabled", "true");
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.LAUNCHING_ERROR);
        });
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        this.thrown.expect(KylinException.class);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
    }

    @Test
    public void testStopJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_BUILD);
        streamingScheduler.submitJob(PROJECT, modelId, JobTypeEnum.STREAMING_MERGE);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        String jobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_BUILD.toString());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        String jobId2 = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(jobId2).getCurrentStatus());
        streamingScheduler.stopJob(modelId, JobTypeEnum.STREAMING_BUILD);
        streamingScheduler.stopJob(modelId, JobTypeEnum.STREAMING_MERGE);
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid(jobId);
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid(jobId2);
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testStopBuildJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        JobTypeEnum jobTypeEnum = JobTypeEnum.STREAMING_BUILD;
        String jobId = StreamingUtils.getJobId(modelId, jobTypeEnum.toString());
        streamingScheduler.submitJob(PROJECT, modelId, jobTypeEnum);
        KylinConfig testConfig = getTestConfig();
        Assert.assertEquals(JobStatusEnum.RUNNING, StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingScheduler.stopJob(modelId, jobTypeEnum);
        Assert.assertEquals(JobStatusEnum.STOPPED, StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(jobId).getCurrentStatus());
    }

    @Test
    public void testStopMergeJob() {
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        JobTypeEnum jobTypeEnum = JobTypeEnum.STREAMING_MERGE;
        String jobId = StreamingUtils.getJobId(modelId, jobTypeEnum.toString());
        streamingScheduler.submitJob(PROJECT, modelId, jobTypeEnum);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingScheduler.stopJob(modelId, jobTypeEnum);
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
    }

    @Test
    public void testStopYarnJob() {
        StreamingScheduler streamingScheduler = (StreamingScheduler) Mockito.spy(new StreamingScheduler(PROJECT));
        JobTypeEnum jobTypeEnum = JobTypeEnum.STREAMING_MERGE;
        String jobId = StreamingUtils.getJobId(modelId, jobTypeEnum.toString());
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob(jobId, streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        Mockito.when(Boolean.valueOf(streamingScheduler.applicationExisted(jobId))).thenReturn(true);
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
        streamingScheduler.stopJob(modelId, jobTypeEnum);
        Assert.assertEquals(JobStatusEnum.STOPPING, streamingJobManager.getStreamingJobByUuid(jobId).getCurrentStatus());
    }

    @Test
    public void testRetryJob() {
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.getParams().put("kylin.streaming.job-retry-enabled", "true");
            streamingJobMeta.setCurrentStatus(JobStatusEnum.ERROR);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.getParams().put("kylin.streaming.job-retry-enabled", "true");
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.ERROR);
        });
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        streamingScheduler.retryJob();
        Map map = (Map) ReflectionUtils.getField(streamingScheduler, "retryMap");
        Assert.assertTrue(map.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_build"));
        Assert.assertTrue(map.containsKey("e78a89dd-847f-4574-8afa-8768b4228b72_merge"));
        for (int i = 0; i < 5; i++) {
            streamingScheduler.retryJob();
        }
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testResumeJobOfStartingStatus() {
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.STARTING);
            streamingJobMeta.setSkipListener(true);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.STARTING);
        });
        new StreamingScheduler(PROJECT);
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid.getCurrentStatus());
        Assert.assertFalse(streamingJobByUuid.isSkipListener());
        Assert.assertEquals(JobStatusEnum.RUNNING, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testKillJobOfStoppingStatus() {
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(getTestConfig(), PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.STOPPING);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.STOPPING);
        });
        new StreamingScheduler(PROJECT);
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testKillStreamingJob() {
        KylinConfig testConfig = getTestConfig();
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(testConfig, PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        streamingScheduler.killJob(modelId, JobTypeEnum.STREAMING_MERGE, JobStatusEnum.ERROR);
        streamingScheduler.killJob(modelId, JobTypeEnum.STREAMING_BUILD, JobStatusEnum.ERROR);
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.ERROR, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testKillYarnApplication() {
        StreamingScheduler streamingScheduler = (StreamingScheduler) Mockito.spy(new StreamingScheduler(PROJECT));
        String jobId = StreamingUtils.getJobId(modelId, JobTypeEnum.STREAMING_MERGE.toString());
        StreamingJobManager.getInstance(getTestConfig(), PROJECT).updateStreamingJob(jobId, streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        Mockito.when(Boolean.valueOf(streamingScheduler.applicationExisted(jobId))).thenReturn(true);
        this.thrown.expect(KylinException.class);
        streamingScheduler.killYarnApplication(jobId, modelId);
    }

    @Test
    public void testForceStopStreamingJob() {
        KylinConfig testConfig = getTestConfig();
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(testConfig, PROJECT);
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_build", streamingJobMeta -> {
            streamingJobMeta.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        streamingJobManager.updateStreamingJob("e78a89dd-847f-4574-8afa-8768b4228b72_merge", streamingJobMeta2 -> {
            streamingJobMeta2.setCurrentStatus(JobStatusEnum.RUNNING);
        });
        streamingScheduler.killJob(modelId, JobTypeEnum.STREAMING_MERGE, JobStatusEnum.STOPPED);
        streamingScheduler.killJob(modelId, JobTypeEnum.STREAMING_BUILD, JobStatusEnum.STOPPED);
        StreamingJobMeta streamingJobByUuid = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build");
        StreamingJobMeta streamingJobByUuid2 = streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_merge");
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid.getCurrentStatus());
        Assert.assertEquals(JobStatusEnum.STOPPED, streamingJobByUuid2.getCurrentStatus());
    }

    @Test
    public void testSkipJobListener() {
        KylinConfig testConfig = getTestConfig();
        StreamingScheduler streamingScheduler = new StreamingScheduler(PROJECT);
        StreamingJobManager streamingJobManager = StreamingJobManager.getInstance(testConfig, PROJECT);
        Assert.assertFalse(streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build").isSkipListener());
        streamingScheduler.skipJobListener(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72_build", true);
        Assert.assertTrue(streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build").isSkipListener());
        streamingScheduler.skipJobListener(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72_build", false);
        Assert.assertFalse(streamingJobManager.getStreamingJobByUuid("e78a89dd-847f-4574-8afa-8768b4228b72_build").isSkipListener());
    }

    @Test
    public void testShutdownByProject() {
        StreamingScheduler.getInstance(PROJECT);
        StreamingScheduler.shutdownByProject(PROJECT);
        Assert.assertTrue(((Map) ReflectionUtils.getField((Class<?>) StreamingScheduler.class, "INSTANCE_MAP")).isEmpty());
    }
}
