package org.apache.kylin.streaming.jobs.thread;

import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.streaming.app.StreamingMergeEntry;
import org.apache.kylin.streaming.jobs.impl.StreamingJobLauncher;
import org.apache.kylin.streaming.util.AwaitUtils;
import org.apache.kylin.streaming.util.StreamingTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/kylin/streaming/jobs/thread/StreamingJobRunnerTest.class */
public class StreamingJobRunnerTest extends StreamingTestCase {
    private static String PROJECT = "streaming_test";

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private StreamingJobRunner runner;

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testStop() {
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        String jobId = StreamingUtils.getJobId("e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD.name());
        Assert.assertFalse(streamingMergeEntry.isGracefulShutdown(PROJECT, jobId));
        String jobId2 = StreamingUtils.getJobId("e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE.name());
        Assert.assertFalse(streamingMergeEntry.isGracefulShutdown(PROJECT, jobId2));
        this.runner = new StreamingJobRunner(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        this.runner.init();
        this.runner.stop();
        Assert.assertTrue(streamingMergeEntry.isGracefulShutdown(PROJECT, jobId));
        this.runner = new StreamingJobRunner(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        this.runner.init();
        this.runner.stop();
        Assert.assertTrue(streamingMergeEntry.isGracefulShutdown(PROJECT, jobId2));
    }

    @Test
    public void testStopWithNoInitial() {
        getTestConfig();
        this.runner = new StreamingJobRunner(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        this.runner.stop();
        StreamingMergeEntry streamingMergeEntry = new StreamingMergeEntry();
        Assert.assertFalse(streamingMergeEntry.isGracefulShutdown(PROJECT, StreamingUtils.getJobId("e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD.name())));
        Assert.assertFalse(streamingMergeEntry.isGracefulShutdown(PROJECT, StreamingUtils.getJobId("e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE.name())));
    }

    @Test
    public void testBuildJobRunner_run() {
        StreamingJobRunner streamingJobRunner = new StreamingJobRunner(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        streamingJobRunner.init();
        AwaitUtils.await(() -> {
            streamingJobRunner.run();
        }, 10000, () -> {
        });
    }

    @Test
    public void testBuildJobLauncher_launch() {
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        AwaitUtils.await(() -> {
            streamingJobLauncher.launch();
        }, 10000, () -> {
        });
    }

    @Test
    public void testMergeJobLauncher_launch() {
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        AwaitUtils.await(() -> {
            streamingJobLauncher.launch();
        }, 10000, () -> {
        });
    }
}
