/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kylin.streaming.jobs.impl;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.kylin.common.KapConfig;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.apache.kylin.common.exception.KylinException;
import org.apache.kylin.common.exception.ServerErrorCode;
import org.apache.kylin.common.util.NLocalFileMetadataTestCase;
import org.apache.kylin.job.execution.JobTypeEnum;
import org.apache.kylin.metadata.cube.utils.StreamingUtils;
import org.apache.kylin.metadata.streaming.DataParserInfo;
import org.apache.kylin.streaming.jobs.impl.StreamingJobLauncher;
import org.apache.kylin.streaming.manager.StreamingJobManager;
import org.apache.kylin.streaming.metadata.StreamingJobMeta;
import org.apache.kylin.streaming.util.ReflectionUtils;
import org.apache.spark.launcher.SparkAppHandle;
import org.apache.spark.launcher.SparkLauncher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.springframework.test.util.ReflectionTestUtils;

@RunWith(value=PowerMockRunner.class)
@PowerMockIgnore(value={"javax.net.ssl.*", "javax.management.*", "org.apache.hadoop.*", "javax.security.*", "javax.crypto.*", "javax.script.*"})
@PrepareForTest(value={StreamingJobLauncher.class})
public class StreamingJobLauncherTest
extends NLocalFileMetadataTestCase {
    private static final String PROJECT = "streaming_test";
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    @Rule
    public TestName testName = new TestName();
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @Before
    public void setUp() throws Exception {
        this.createTestMetadata(new String[0]);
    }

    @After
    public void tearDown() {
        this.cleanupTestMetadata();
    }

    @Test
    public void testBuildJobInit() {
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        Assert.assertFalse((boolean)launcher.isInitialized());
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Assert.assertTrue((boolean)launcher.isInitialized());
        Object mainClazz = ReflectionUtils.getField(launcher, "mainClazz");
        Assert.assertEquals((Object)"org.apache.kylin.streaming.app.StreamingEntry", (Object)mainClazz);
        Object jobParams = ReflectionUtils.getField(launcher, "jobParams");
        Assert.assertNotNull((Object)jobParams);
        String[] appArgs = (String[])ReflectionUtils.getField(launcher, "appArgs");
        Assert.assertEquals((long)5L, (long)appArgs.length);
        Assert.assertEquals((Object)PROJECT, (Object)appArgs[0]);
        Assert.assertEquals((Object)"e78a89dd-847f-4574-8afa-8768b4228b72", (Object)appArgs[1]);
        Assert.assertEquals((Object)"30", (Object)appArgs[2]);
        Assert.assertEquals((Object)"", (Object)appArgs[3]);
        Assert.assertEquals((Object)StreamingJobLauncherTest.getTestConfig().getMetadataUrl().toString(), (Object)appArgs[4]);
    }

    @Test
    public void testMergeJobInit() {
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        Assert.assertFalse((boolean)launcher.isInitialized());
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        Object mainClazz = ReflectionUtils.getField(launcher, "mainClazz");
        Assert.assertEquals((Object)"org.apache.kylin.streaming.app.StreamingMergeEntry", (Object)mainClazz);
        Object jobParams = ReflectionUtils.getField(launcher, "jobParams");
        Assert.assertNotNull((Object)jobParams);
        String[] appArgs = (String[])ReflectionUtils.getField(launcher, "appArgs");
        Assert.assertEquals((long)5L, (long)appArgs.length);
        Assert.assertEquals((Object)PROJECT, (Object)appArgs[0]);
        Assert.assertEquals((Object)"e78a89dd-847f-4574-8afa-8768b4228b72", (Object)appArgs[1]);
        Assert.assertEquals((Object)"32m", (Object)appArgs[2]);
        Assert.assertEquals((Object)"3", (Object)appArgs[3]);
        Assert.assertEquals((Object)StreamingJobLauncherTest.getTestConfig().getMetadataUrl().toString(), (Object)appArgs[4]);
    }

    @Test
    public void testStop() {
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        launcher.stop();
        StreamingJobManager mgr = StreamingJobManager.getInstance((KylinConfig)config, (String)PROJECT);
        String uuid = StreamingUtils.getJobId((String)"e78a89dd-847f-4574-8afa-8768b4228b72", (String)JobTypeEnum.STREAMING_MERGE.name());
        StreamingJobMeta meta = mgr.getStreamingJobByUuid(uuid);
        Assert.assertEquals((Object)"GRACEFUL_SHUTDOWN", (Object)meta.getAction());
    }

    @Test
    public void testStartBuildJob() throws Exception {
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        MockupSparkLauncher mockup = new MockupSparkLauncher();
        ReflectionUtils.setField(launcher, "launcher", (Object)mockup);
        launcher.startYarnJob();
        Assert.assertNull(mockup.sparkConf.get("spark.kerberos.keytab"));
        Assert.assertNull(mockup.sparkConf.get("spark.kerberos.principal"));
    }

    @Test
    public void testStartMergeJob() throws Exception {
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        MockupSparkLauncher mockup = new MockupSparkLauncher();
        ReflectionUtils.setField(launcher, "launcher", (Object)mockup);
        launcher.startYarnJob();
        Assert.assertNull(mockup.sparkConf.get("spark.kerberos.keytab"));
        Assert.assertNull(mockup.sparkConf.get("spark.kerberos.principal"));
    }

    @Test
    public void testStartYarnBuildJob() throws Exception {
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        config.setProperty("kylin.kerberos.enabled", "true");
        config.setProperty("kylin.tool.mount-spark-log-dir", ".");
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        config.setProperty("kylin.kerberos.enabled", "true");
        config.setProperty("kylin.kafka-jaas.enabled", "true");
        config.setProperty("kylin.streaming.spark-conf.spark.driver.extraJavaOptions", "-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf");
        config.setProperty("kylin.streaming.spark-conf.spark.executor.extraJavaOptions", "-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf");
        config.setProperty("kylin.streaming.spark-conf.spark.am.extraJavaOptions", "-Djava.security.krb5.conf=./krb5.conf -Djava.security.auth.login.config=./kafka_jaas.conf");
        MockupSparkLauncher mockup = new MockupSparkLauncher();
        ReflectionUtils.setField(launcher, "launcher", (Object)mockup);
        FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)"KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", (Charset)StandardCharsets.UTF_8);
        launcher.startYarnJob();
        Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions"));
        Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions"));
        Assert.assertNotNull(mockup.sparkConf.get("spark.am.extraJavaOptions"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStartYarnMergeJob() throws Exception {
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        try {
            config.setProperty("kylin.kerberos.enabled", "true");
            config.setProperty("kylin.tool.mount-spark-log-dir", ".");
            KapConfig kapConfig = KapConfig.getInstanceFromEnv();
            config.setProperty("kylin.kafka-jaas.enabled", "true");
            FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)"KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required}", (Charset)StandardCharsets.UTF_8);
            MockupSparkLauncher mockup = new MockupSparkLauncher();
            ReflectionUtils.setField(launcher, "launcher", (Object)mockup);
            launcher.startYarnJob();
            Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.keytab"));
            Assert.assertNotNull(mockup.sparkConf.get("spark.kerberos.principal"));
            Assert.assertFalse((boolean)mockup.files.contains(kapConfig.getKafkaJaasConfPath()));
        }
        finally {
            FileUtils.deleteQuietly((File)new File(KapConfig.getInstanceFromEnv().getKafkaJaasConfPath()));
        }
    }

    @Test
    public void testLaunchMergeJobException_Local() {
        try {
            this.overwriteSystemProp("streaming.local", "true");
            KylinConfig config = StreamingJobLauncherTest.getTestConfig();
            String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
            StreamingJobLauncher launcher = new StreamingJobLauncher();
            launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
            config.setProperty("kylin.env", "local");
            launcher.launch();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof KylinException));
            Assert.assertEquals((Object)ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), (Object)((KylinException)e).getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchMergeJobException_Yarn() {
        try {
            this.overwriteSystemProp("streaming.local", "false");
            KylinConfig config = StreamingJobLauncherTest.getTestConfig();
            String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
            StreamingJobLauncher launcher = new StreamingJobLauncher();
            launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
            config.setProperty("kylin.env", "prod");
            launcher.launch();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof KylinException));
            Assert.assertEquals((Object)ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), (Object)((KylinException)e).getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchBuildJobException_Local() {
        try {
            this.overwriteSystemProp("streaming.local", "true");
            KylinConfig config = StreamingJobLauncherTest.getTestConfig();
            String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
            StreamingJobLauncher launcher = new StreamingJobLauncher();
            launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
            config.setProperty("kylin.env", "local");
            launcher.launch();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof KylinException));
            Assert.assertEquals((Object)ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), (Object)((KylinException)e).getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchBuildJobException_Yarn() throws Exception {
        try {
            this.overwriteSystemProp("streaming.local", "true");
            KylinConfig config = StreamingJobLauncherTest.getTestConfig();
            String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
            StreamingJobLauncher launcher = new StreamingJobLauncher();
            launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
            config.setProperty("kylin.env", "local");
            launcher.launch();
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof KylinException));
            Assert.assertEquals((Object)ServerErrorCode.JOB_START_FAILURE.toErrorCode().getCodeString(), (Object)((KylinException)e).getErrorCode().getCodeString());
        }
    }

    @Test
    public void testLaunchBuildJobLaunch_Yarn() throws Exception {
        this.overwriteSystemProp("streaming.local", "false");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = (StreamingJobLauncher)Mockito.spy((Object)new StreamingJobLauncher());
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        config.setProperty("kylin.env", "prod");
        ((StreamingJobLauncher)Mockito.doNothing().when((Object)launcher)).startYarnJob();
        launcher.launch();
    }

    @Test
    public void testLaunchMergeJobLaunch_Yarn() throws Exception {
        this.overwriteSystemProp("streaming.local", "false");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = (StreamingJobLauncher)Mockito.spy((Object)new StreamingJobLauncher());
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_MERGE);
        config.setProperty("kylin.env", "prod");
        ((StreamingJobLauncher)Mockito.doNothing().when((Object)launcher)).startYarnJob();
        launcher.launch();
    }

    @Test
    public void testGetJobTmpMetaStoreUrlPath() {
        this.overwriteSystemProp("streaming.local", "true");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Object jobTmpMetaStoreUrlPath = ReflectionUtils.invokeGetterMethod(launcher, "getJobTmpMetaStoreUrlPath");
        Assert.assertEquals((Object)String.format(Locale.ROOT, "%s/%s/%s/meta", config.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72"), (Object)jobTmpMetaStoreUrlPath);
    }

    @Test
    public void testGetAvailableLatestDumpPath_Null() {
        this.overwriteSystemProp("streaming.local", "true");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        File mainDir = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        config.setProperty("kylin.engine.streaming-jobs-location", mainDir.getAbsolutePath());
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Object availableMetaDumpPath = ReflectionUtils.invokeGetterMethod(launcher, "getAvailableLatestDumpPath");
        Assert.assertNull((Object)availableMetaDumpPath);
    }

    @Test
    public void testGetAvailableLatestDumpPath_NotNull() throws IOException {
        File mainDir = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir((File)mainDir);
        this.overwriteSystemProp("streaming.local", "true");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-jobs-location", mainDir.getAbsolutePath());
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        String metaRootPath = String.format(Locale.ROOT, "%s/%s/%s/meta", config.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72");
        FileUtils.forceMkdir((File)new File(metaRootPath));
        FileUtils.forceMkdir((File)new File(metaRootPath, "meta_" + System.currentTimeMillis()));
        Object availableMetaDumpPath = ReflectionUtils.invokeGetterMethod(launcher, "getAvailableLatestDumpPath");
        Assert.assertNotNull((Object)availableMetaDumpPath);
    }

    @Test
    public void testGetAvailableLatestDumpPath_CleanMeta() throws IOException {
        File mainDir = new File(this.temporaryFolder.getRoot(), this.testName.getMethodName());
        FileUtils.forceMkdir((File)mainDir);
        this.overwriteSystemProp("streaming.local", "true");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-jobs-location", mainDir.getAbsolutePath());
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        String metaRootPath = String.format(Locale.ROOT, "%s/%s/%s/meta", config.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72");
        FileUtils.forceMkdir((File)new File(metaRootPath));
        long outDateTime = System.currentTimeMillis() - TimeUnit.MILLISECONDS.toMillis(config.getStreamingJobMetaRetainedTime()) * 2L;
        File outDateMetaFile = new File(metaRootPath, "meta_" + outDateTime);
        FileUtils.forceMkdir((File)outDateMetaFile);
        Assert.assertTrue((boolean)outDateMetaFile.setLastModified(outDateTime));
        Assert.assertTrue((boolean)outDateMetaFile.exists());
        Object availableMetaDumpPath = ReflectionUtils.invokeGetterMethod(launcher, "getAvailableLatestDumpPath");
        Assert.assertNull((Object)availableMetaDumpPath);
    }

    @Test
    public void testGetJobTmpHdfsMetaStorageUrl() {
        this.overwriteSystemProp("streaming.local", "true");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        StorageURL jobTmpMetaStoreUrl = (StorageURL)ReflectionUtils.invokeGetterMethod(launcher, "getJobTmpHdfsMetaStorageUrl");
        Map params = jobTmpMetaStoreUrl.getAllParameters();
        Assert.assertEquals((Object)"true", params.get("zip"));
        Assert.assertEquals((Object)"true", params.get("snapshot"));
        String metaRootPath = String.format(Locale.ROOT, "%s/%s/%s/meta", config.getStreamingBaseJobsLocation(), PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72");
        String metaPath = metaRootPath + "/meta_" + ReflectionUtils.getField(launcher, "currentTimestamp");
        Assert.assertEquals((Object)metaPath, params.get("path"));
    }

    @Test
    public void testGetMetadataDumpList() {
        this.overwriteSystemProp("streaming.local", "true");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        Set dumpSet = launcher.getMetadataDumpList();
        Assert.assertEquals((long)13L, (long)dumpSet.size());
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/dataflow/e78a89dd-847f-4574-8afa-8768b4228b72.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236631.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236632.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236633.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/dataflow_details/e78a89dd-847f-4574-8afa-8768b4228b72/c380dd2a-43b8-4268-b73d-2a5f76236901.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/index_plan/e78a89dd-847f-4574-8afa-8768b4228b72.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/_global/project/streaming_test.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/model_desc/e78a89dd-847f-4574-8afa-8768b4228b72.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/table/SSB.P_LINEORDER_STR.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/kafka/SSB.P_LINEORDER_STR.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/table/SSB.PART.json"));
        Assert.assertTrue((boolean)dumpSet.contains("/_image"));
        Assert.assertTrue((boolean)dumpSet.contains("/streaming_test/streaming/e78a89dd-847f-4574-8afa-8768b4228b72_build"));
    }

    @Test
    public void testInitStorageUrl() {
        this.overwriteSystemProp("streaming.local", "true");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        config.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        ReflectionUtils.invokeGetterMethod(launcher, "initStorageUrl");
        StorageURL storageUrl = (StorageURL)ReflectionUtils.getField(launcher, "distMetaStorageUrl");
        Assert.assertEquals((Object)config.getMetadataUrl().getScheme(), (Object)storageUrl.getScheme());
    }

    @Test
    public void testInitStorageUrl_JobCluster() {
        this.overwriteSystemProp("streaming.local", "false");
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        config.setProperty("kylin.engine.streaming-jobs-location", "/tmp/");
        config.setMetadataUrl("xxx");
        config.setProperty("kylin.env", "utxxx");
        ReflectionUtils.invokeGetterMethod(launcher, "initStorageUrl");
        StorageURL storageUrl = (StorageURL)ReflectionUtils.getField(launcher, "distMetaStorageUrl");
        Assert.assertEquals((Object)"hdfs", (Object)storageUrl.getScheme());
    }

    @Test
    public void testAddParserJar() throws Exception {
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        MockupSparkLauncher mockup = new MockupSparkLauncher();
        ReflectionTestUtils.setField((Object)launcher, (String)"launcher", (Object)((Object)mockup));
        StreamingJobLauncher mockLaunch = (StreamingJobLauncher)PowerMockito.spy((Object)launcher);
        PowerMockito.when((Object)mockLaunch, (String)"getParserName", (Object[])new Object[0]).thenReturn((Object)"org.apache.kylin.parser.TimedJsonStreamParser2");
        DataParserInfo dataParserInfo = new DataParserInfo(PROJECT, "org.apache.kylin.parser.TimedJsonStreamParser", "default");
        PowerMockito.when((Object)mockLaunch, (String)"getDataParser", (Object[])new Object[]{Mockito.anyString()}).thenReturn((Object)dataParserInfo);
        PowerMockito.doReturn((Object)"default").when((Object)mockLaunch, "getParserJarPath", new Object[]{dataParserInfo});
        ReflectionTestUtils.invokeMethod((Object)mockLaunch, (String)"addParserJar", (Object[])new Object[]{mockup});
        mockup.startApplication(new SparkAppHandle.Listener[0]);
        Assert.assertTrue((boolean)mockup.jars.contains("default"));
    }

    @Test
    public void testStartYarnBuildJobWithoutExtraOpts() throws Exception {
        KylinConfig config = StreamingJobLauncherTest.getTestConfig();
        String modelId = "e78a89dd-847f-4574-8afa-8768b4228b72";
        StreamingJobLauncher launcher = new StreamingJobLauncher();
        launcher.init(PROJECT, "e78a89dd-847f-4574-8afa-8768b4228b72", JobTypeEnum.STREAMING_BUILD);
        config.setProperty("kylin.kerberos.enabled", "true");
        config.setProperty("kylin.tool.mount-spark-log-dir", ".");
        KapConfig kapConfig = KapConfig.getInstanceFromEnv();
        config.setProperty("kylin.kerberos.enabled", "true");
        config.setProperty("kylin.kafka-jaas.enabled", "true");
        MockupSparkLauncher mockup = new MockupSparkLauncher();
        ReflectionUtils.setField(launcher, "launcher", (Object)mockup);
        FileUtils.write((File)new File(kapConfig.getKafkaJaasConfPath()), (CharSequence)"KafkaClient{ org.apache.kafka.common.security.scram.ScramLoginModule required;}", (Charset)StandardCharsets.UTF_8);
        launcher.startYarnJob();
        Assert.assertNotNull(mockup.sparkConf.get("spark.driver.extraJavaOptions"));
        Assert.assertNotNull(mockup.sparkConf.get("spark.executor.extraJavaOptions"));
        Assert.assertNotNull(mockup.sparkConf.get("spark.yarn.am.extraJavaOptions"));
    }

    static class MockupSparkLauncher
    extends SparkLauncher {
        private Map<String, String> sparkConf;
        private List<String> files;
        private List<String> jars;

        MockupSparkLauncher() {
        }

        public SparkAppHandle startApplication(SparkAppHandle.Listener ... listeners) {
            Object builder = ReflectionUtils.getField((Object)this, "builder");
            this.sparkConf = (Map)ReflectionUtils.getField(builder, "conf");
            this.files = (List)ReflectionUtils.getField(builder, "files");
            this.jars = (List)ReflectionUtils.getField(builder, "jars");
            return null;
        }
    }
}

