package org.apache.kylin.streaming.jobs.impl;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.streaming.DataParserInfo;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.test.util.ReflectionTestUtils;

@PrepareForTest({StreamingJobLauncher.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.net.ssl.*", "javax.management.*", "org.apache.hadoop.*", "javax.security.*", "javax.crypto.*", "javax.script.*"})
/* loaded from: input_file:org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest.class */
public class StreamingJobLauncherTest extends NLocalFileMetadataTestCase {
    private static final String PROJECT = "streaming_test";

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public TestName testName = new TestName();

    /* loaded from: input_file:org/apache/kylin/streaming/jobs/impl/StreamingJobLauncherTest$MockupSparkLauncher.class */
    static class MockupSparkLauncher extends SparkLauncher {
        private Map<String, String> sparkConf;
        private List<String> files;
        private List<String> jars;

        MockupSparkLauncher() {
        }

        public SparkAppHandle startApplication(SparkAppHandle.Listener... listenerArr) {
            Object field = ReflectionUtils.getField(this, "builder");
            this.sparkConf = (Map) ReflectionUtils.getField(field, "conf");
            this.files = (List) ReflectionUtils.getField(field, "files");
            this.jars = (List) ReflectionUtils.getField(field, "jars");
            return null;
        }
    }

    @Before
    public void setUp() throws Exception {
        createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        cleanupTestMetadata();
    }

    @Test
    public void testBuildJobInit() {
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        Assert.assertFalse(streamingJobLauncher.isInitialized());
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Assert.assertTrue(streamingJobLauncher.isInitialized());
        Assert.assertEquals("org.apache.kylin.streaming.app.StreamingEntry", ReflectionUtils.getField(streamingJobLauncher, "mainClazz"));
        Assert.assertNotNull(ReflectionUtils.getField(streamingJobLauncher, "jobParams"));
        String[] strArr = (String[]) ReflectionUtils.getField(streamingJobLauncher, "appArgs");
        Assert.assertEquals(5L, strArr.length);
        Assert.assertEquals(PROJECT, strArr[0]);
        Assert.assertEquals("e78a89dd-847f-4574-8afa-8768b4228b72", strArr[1]);
        Assert.assertEquals("30", strArr[2]);
        Assert.assertEquals("", strArr[3]);
        Assert.assertEquals(getTestConfig().getMetadataUrl().toString(), strArr[4]);
    }

    @Test
    public void testMergeJobInit() {
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        Assert.assertFalse(streamingJobLauncher.isInitialized());
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        Assert.assertEquals("org.apache.kylin.streaming.app.StreamingMergeEntry", ReflectionUtils.getField(streamingJobLauncher, "mainClazz"));
        Assert.assertNotNull(ReflectionUtils.getField(streamingJobLauncher, "jobParams"));
        String[] strArr = (String[]) ReflectionUtils.getField(streamingJobLauncher, "appArgs");
        Assert.assertEquals(5L, strArr.length);
        Assert.assertEquals(PROJECT, strArr[0]);
        Assert.assertEquals("e78a89dd-847f-4574-8afa-8768b4228b72", strArr[1]);
        Assert.assertEquals("32m", strArr[2]);
        Assert.assertEquals("3", strArr[3]);
        Assert.assertEquals(getTestConfig().getMetadataUrl().toString(), strArr[4]);
    }

    @Test
    public void testStop() {
        KylinConfig testConfig = getTestConfig();
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        streamingJobLauncher.stop();
        Assert.assertEquals("GRACEFUL_SHUTDOWN", StreamingJobManager.getInstance(testConfig, PROJECT).getStreamingJobByUuid(StreamingUtils.getJobId("e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE.name())).getAction());
    }

    @Test
    public void testStartBuildJob() throws Exception {
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        MockupSparkLauncher mockupSparkLauncher = new MockupSparkLauncher();
        ReflectionUtils.setField(streamingJobLauncher, "launcher", mockupSparkLauncher);
        streamingJobLauncher.startYarnJob();
        Assert.assertNull(mockupSparkLauncher.sparkConf.get("spark.kerberos.keytab"));
        Assert.assertNull(mockupSparkLauncher.sparkConf.get("spark.kerberos.principal"));
    }

    @Test
    public void testStartMergeJob() throws Exception {
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        MockupSparkLauncher mockupSparkLauncher = new MockupSparkLauncher();
        ReflectionUtils.setField(streamingJobLauncher, "launcher", mockupSparkLauncher);
        streamingJobLauncher.startYarnJob();
        Assert.assertNull(mockupSparkLauncher.sparkConf.get("spark.kerberos.keytab"));
        Assert.assertNull(mockupSparkLauncher.sparkConf.get("spark.kerberos.principal"));
    }

    @Test
    public void testStartYarnBuildJob() throws Exception {
        KylinConfig testConfig = getTestConfig();
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        testConfig.setProperty("kylin.kerberos.enabled", "true");
        testConfig.setProperty("kylin.tool.mount-spark-log-dir", ".");
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        testConfig.setProperty("kylin.kerberos.enabled", "true");
        testConfig.setProperty("kylin.kafka-jaas.enabled", "true");
        testConfig.setProperty("kylin.streaming.spark-conf.spark.driver.extraJavaOptions", "-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf");
        testConfig.setProperty("kylin.streaming.spark-conf.spark.executor.extraJavaOptions", "-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf");
        testConfig.setProperty("kylin.streaming.spark-conf.spark.am.extraJavaOptions", "-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf");
        MockupSparkLauncher mockupSparkLauncher = new MockupSparkLauncher();
        ReflectionUtils.setField(streamingJobLauncher, "launcher", mockupSparkLauncher);
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", StandardCharsets.UTF_8);
        streamingJobLauncher.startYarnJob();
        Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.driver.extraJavaOptions"));
        Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.executor.extraJavaOptions"));
        Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.am.extraJavaOptions"));
    }

    @Test
    public void testStartYarnMergeJob() throws Exception {
        KylinConfig testConfig = getTestConfig();
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        try {
            testConfig.setProperty("kylin.kerberos.enabled", "true");
            testConfig.setProperty("kylin.tool.mount-spark-log-dir", ".");
            KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
            testConfig.setProperty("kylin.kafka-jaas.enabled", "true");
            FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}", StandardCharsets.UTF_8);
            MockupSparkLauncher mockupSparkLauncher = new MockupSparkLauncher();
            ReflectionUtils.setField(streamingJobLauncher, "launcher", mockupSparkLauncher);
            streamingJobLauncher.startYarnJob();
            Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.kerberos.keytab"));
            Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.kerberos.principal"));
            Assert.assertFalse(mockupSparkLauncher.files.contains(instanceFromEnv.getKafkaJaasConfPath()));
            FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
        } catch (Throwable th) {
            FileUtils.deleteQuietly(new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
            throw th;
        }
    }

    @Test
    public void testLaunchMergeJobException_Local() {
        try {
            overwriteSystemProp("streaming.local", "true");
            KylinConfig testConfig = getTestConfig();
            StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
            streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
            testConfig.setProperty("kylin.env", "local");
            streamingJobLauncher.launch();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
            Assert.assertEquals(ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), e.getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchMergeJobException_Yarn() {
        try {
            overwriteSystemProp("streaming.local", "false");
            KylinConfig testConfig = getTestConfig();
            StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
            streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
            testConfig.setProperty("kylin.env", "prod");
            streamingJobLauncher.launch();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
            Assert.assertEquals(ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), e.getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchBuildJobException_Local() {
        try {
            overwriteSystemProp("streaming.local", "true");
            KylinConfig testConfig = getTestConfig();
            StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
            streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
            testConfig.setProperty("kylin.env", "local");
            streamingJobLauncher.launch();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
            Assert.assertEquals(ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), e.getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchBuildJobException_Yarn() {
        try {
            overwriteSystemProp("streaming.local", "true");
            KylinConfig testConfig = getTestConfig();
            StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
            streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
            testConfig.setProperty("kylin.env", "local");
            streamingJobLauncher.launch();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof KylinException);
            Assert.assertEquals(ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), e.getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchBuildJobLaunch_Yarn() throws Exception {
        overwriteSystemProp("streaming.local", "false");
        KylinConfig testConfig = getTestConfig();
        StreamingJobLauncher streamingJobLauncher = (StreamingJobLauncher) Mockito.spy(new StreamingJobLauncher());
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        testConfig.setProperty("kylin.env", "prod");
        ((StreamingJobLauncher) Mockito.doNothing().when(streamingJobLauncher)).startYarnJob();
        streamingJobLauncher.launch();
    }

    @Test
    public void testLaunchMergeJobLaunch_Yarn() throws Exception {
        overwriteSystemProp("streaming.local", "false");
        KylinConfig testConfig = getTestConfig();
        StreamingJobLauncher streamingJobLauncher = (StreamingJobLauncher) Mockito.spy(new StreamingJobLauncher());
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        testConfig.setProperty("kylin.env", "prod");
        ((StreamingJobLauncher) Mockito.doNothing().when(streamingJobLauncher)).startYarnJob();
        streamingJobLauncher.launch();
    }

    @Test
    public void testGetJobTmpMetaStoreUrlPath() {
        overwriteSystemProp("streaming.local", "true");
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Assert.assertEquals(String.format(Locale.ROOT, "%s/%s/%s/meta", testConfig.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72"), ReflectionUtils.invokeGetterMethod(streamingJobLauncher, "getJobTmpMetaStoreUrlPath"));
    }

    @Test
    public void testGetAvailableLatestDumpPath_Null() {
        overwriteSystemProp("streaming.local", "true");
        getTestConfig().setProperty("kylin.engine.streaming-jobs-location", new File(this.temporaryFolder.getRoot(), this.testName.getMethodName()).getAbsolutePath());
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Assert.assertNull(ReflectionUtils.invokeGetterMethod(streamingJobLauncher, "getAvailableLatestDumpPath"));
    }

    @Test
    public void testGetAvailableLatestDumpPath_NotNull() throws IOException {
        File file = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir(file);
        overwriteSystemProp("streaming.local", "true");
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-jobs-location", file.getAbsolutePath());
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        String format = String.format(Locale.ROOT, "%s/%s/%s/meta", testConfig.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72");
        FileUtils.forceMkdir(new File(format));
        FileUtils.forceMkdir(new File(format, "meta_" + System.currentTimeMillis()));
        Assert.assertNotNull(ReflectionUtils.invokeGetterMethod(streamingJobLauncher, "getAvailableLatestDumpPath"));
    }

    @Test
    public void testGetAvailableLatestDumpPath_CleanMeta() throws IOException {
        File file = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir(file);
        overwriteSystemProp("streaming.local", "true");
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-jobs-location", file.getAbsolutePath());
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        String format = String.format(Locale.ROOT, "%s/%s/%s/meta", testConfig.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72");
        FileUtils.forceMkdir(new File(format));
        long currentTimeMillis = System.currentTimeMillis() - (TimeUnit.MILLISECONDS.toMillis(testConfig.getStreamingJobMetaRetainedTime()) * 2);
        File file2 = new File(format, "meta_" + currentTimeMillis);
        FileUtils.forceMkdir(file2);
        Assert.assertTrue(file2.setLastModified(currentTimeMillis));
        Assert.assertTrue(file2.exists());
        Assert.assertNull(ReflectionUtils.invokeGetterMethod(streamingJobLauncher, "getAvailableLatestDumpPath"));
    }

    @Test
    public void testGetJobTmpHdfsMetaStorageUrl() {
        overwriteSystemProp("streaming.local", "true");
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Map allParameters = ((StorageURL) ReflectionUtils.invokeGetterMethod(streamingJobLauncher, "getJobTmpHdfsMetaStorageUrl")).getAllParameters();
        Assert.assertEquals("true", allParameters.get("zip"));
        Assert.assertEquals("true", allParameters.get("snapshot"));
        Assert.assertEquals(String.format(Locale.ROOT, "%s/%s/%s/meta", testConfig.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72") + "/meta_" + ReflectionUtils.getField(streamingJobLauncher, "currentTimestamp"), allParameters.get("path"));
    }

    @Test
    public void testGetMetadataDumpList() {
        overwriteSystemProp("streaming.local", "true");
        getTestConfig().setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Set metadataDumpList = streamingJobLauncher.getMetadataDumpList();
        Assert.assertEquals(13L, metadataDumpList.size());
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/dataflow/e78a89dd-847f-4574-8afa-8768b4228b72.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236631.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236632.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236633.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236901.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/index_plan/e78a89dd-847f-4574-8afa-8768b4228b72.json"));
        Assert.assertTrue(metadataDumpList.contains("/_global/project/streaming_test.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/model_desc/e78a89dd-847f-4574-8afa-8768b4228b72.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/table/SSB.P_LINEORDER_STR.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/kafka/SSB.P_LINEORDER_STR.json"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/table/SSB.PART.json"));
        Assert.assertTrue(metadataDumpList.contains("/_image"));
        Assert.assertTrue(metadataDumpList.contains("/streaming_test/streaming/e78a89dd-847f-4574-8afa-8768b4228b72_build"));
    }

    @Test
    public void testInitStorageUrl() {
        overwriteSystemProp("streaming.local", "true");
        KylinConfig testConfig = getTestConfig();
        testConfig.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        ReflectionUtils.invokeGetterMethod(streamingJobLauncher, "initStorageUrl");
        Assert.assertEquals(testConfig.getMetadataUrl().getScheme(), ((StorageURL) ReflectionUtils.getField(streamingJobLauncher, "distMetaStorageUrl")).getScheme());
    }

    @Test
    public void testInitStorageUrl_JobCluster() {
        overwriteSystemProp("streaming.local", "false");
        KylinConfig testConfig = getTestConfig();
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        testConfig.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        testConfig.setMetadataUrl("xxx");
        testConfig.setProperty("kylin.env", "utxxx");
        ReflectionUtils.invokeGetterMethod(streamingJobLauncher, "initStorageUrl");
        Assert.assertEquals("hdfs", ((StorageURL) ReflectionUtils.getField(streamingJobLauncher, "distMetaStorageUrl")).getScheme());
    }

    @Test
    public void testAddParserJar() throws Exception {
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        MockupSparkLauncher mockupSparkLauncher = new MockupSparkLauncher();
        ReflectionTestUtils.setField(streamingJobLauncher, "launcher", mockupSparkLauncher);
        StreamingJobLauncher streamingJobLauncher2 = (StreamingJobLauncher) PowerMockito.spy(streamingJobLauncher);
        PowerMockito.when(streamingJobLauncher2, "getParserName", new Object[0]).thenReturn("org.apache.kylin.parser.TimedJsonStreamParser2");
        DataParserInfo dataParserInfo = new DataParserInfo(PROJECT, "org.apache.kylin.parser.TimedJsonStreamParser", "default");
        PowerMockito.when(streamingJobLauncher2, "getDataParser", new Object[]{Mockito.anyString()}).thenReturn(dataParserInfo);
        PowerMockito.doReturn("default").when(streamingJobLauncher2, "getParserJarPath", new Object[]{dataParserInfo});
        ReflectionTestUtils.invokeMethod(streamingJobLauncher2, "addParserJar", new Object[]{mockupSparkLauncher});
        mockupSparkLauncher.startApplication(new SparkAppHandle.Listener[0]);
        Assert.assertTrue(mockupSparkLauncher.jars.contains("default"));
    }

    @Test
    public void testStartYarnBuildJobWithoutExtraOpts() throws Exception {
        KylinConfig testConfig = getTestConfig();
        StreamingJobLauncher streamingJobLauncher = new StreamingJobLauncher();
        streamingJobLauncher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        testConfig.setProperty("kylin.kerberos.enabled", "true");
        testConfig.setProperty("kylin.tool.mount-spark-log-dir", ".");
        KapConfig instanceFromEnv = KapConfig.getInstanceFromEnv();
        testConfig.setProperty("kylin.kerberos.enabled", "true");
        testConfig.setProperty("kylin.kafka-jaas.enabled", "true");
        MockupSparkLauncher mockupSparkLauncher = new MockupSparkLauncher();
        ReflectionUtils.setField(streamingJobLauncher, "launcher", mockupSparkLauncher);
        FileUtils.write(new File(instanceFromEnv.getKafkaJaasConfPath()), "KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", StandardCharsets.UTF_8);
        streamingJobLauncher.startYarnJob();
        Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.driver.extraJavaOptions"));
        Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.executor.extraJavaOptions"));
        Assert.assertNotNull(mockupSparkLauncher.sparkConf.get("spark.yarn.am.extraJavaOptions"));
    }
}
