package org.apache.zeppelin.interpreter.launcher;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Arrays;
import java.util.Properties;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.ExecRemoteInterpreterProcess;
import org.apache.zeppelin.test.DownloadUtils;
import org.apache.zeppelin.util.Util;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.class */
class SparkInterpreterLauncherTest {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SparkInterpreterLauncher.class);
    private String sparkHome;
    private String zeppelinHome;
    private ZeppelinConfiguration zConf;

    SparkInterpreterLauncherTest() {
    }

    @BeforeEach
    public void setUp() {
        this.sparkHome = DownloadUtils.downloadSpark();
        this.zConf = ZeppelinConfiguration.load();
        this.zConf.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath());
        this.zeppelinHome = this.zConf.getZeppelinHome();
        LOGGER.info("ZEPPELIN_HOME: " + this.zeppelinHome);
    }

    @Test
    void testConnectTimeOut() throws IOException {
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(this.zConf, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
        InterpreterOption interpreterOption = new InterpreterOption();
        interpreterOption.setUserImpersonate(true);
        ExecRemoteInterpreterProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, interpreterOption, (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"));
        Assertions.assertTrue(launch instanceof ExecRemoteInterpreterProcess);
        ExecRemoteInterpreterProcess execRemoteInterpreterProcess = launch;
        try {
            Assertions.assertEquals("name", execRemoteInterpreterProcess.getInterpreterSettingName());
            Assertions.assertEquals(this.zeppelinHome + "/interpreter/groupName", execRemoteInterpreterProcess.getInterpreterDir());
            Assertions.assertEquals(this.zeppelinHome + "/local-repo/groupId", execRemoteInterpreterProcess.getLocalRepoDir());
            Assertions.assertEquals(10000, execRemoteInterpreterProcess.getConnectTimeout());
            Assertions.assertEquals(this.zConf.getInterpreterRemoteRunnerPath(), execRemoteInterpreterProcess.getInterpreterRunner());
            Assertions.assertTrue(execRemoteInterpreterProcess.getEnv().size() >= 2);
            Assertions.assertEquals(true, Boolean.valueOf(execRemoteInterpreterProcess.isUserImpersonated()));
            if (execRemoteInterpreterProcess != null) {
                execRemoteInterpreterProcess.close();
            }
        } catch (Throwable th) {
            if (execRemoteInterpreterProcess != null) {
                try {
                    execRemoteInterpreterProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testLocalMode() throws IOException {
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(this.zConf, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("ENV_1", "");
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "local[*]");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        ExecRemoteInterpreterProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assertions.assertTrue(launch instanceof ExecRemoteInterpreterProcess);
        ExecRemoteInterpreterProcess execRemoteInterpreterProcess = launch;
        try {
            Assertions.assertEquals("spark", execRemoteInterpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue(execRemoteInterpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue(execRemoteInterpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals(this.zConf.getInterpreterRemoteRunnerPath(), execRemoteInterpreterProcess.getInterpreterRunner());
            Assertions.assertTrue(execRemoteInterpreterProcess.getEnv().size() >= 2);
            Assertions.assertEquals(this.sparkHome, execRemoteInterpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertFalse(execRemoteInterpreterProcess.getEnv().containsKey("ENV_1"));
            Assertions.assertTrue(CollectionUtils.isEqualCollection(Arrays.asList("--conf|spark.files=file_1|--conf|spark.jars=jar_1|--conf|spark.app.name=intpGroupId|--conf|spark.master=local[*]".split("\\|")), Arrays.asList(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            if (execRemoteInterpreterProcess != null) {
                execRemoteInterpreterProcess.close();
            }
        } catch (Throwable th) {
            if (execRemoteInterpreterProcess != null) {
                try {
                    execRemoteInterpreterProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testYarnClientMode_1() throws IOException {
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(this.zConf, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn-client");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        ExecRemoteInterpreterProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assertions.assertTrue(launch instanceof ExecRemoteInterpreterProcess);
        ExecRemoteInterpreterProcess execRemoteInterpreterProcess = launch;
        try {
            Assertions.assertEquals("spark", execRemoteInterpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue(execRemoteInterpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue(execRemoteInterpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals(this.zConf.getInterpreterRemoteRunnerPath(), execRemoteInterpreterProcess.getInterpreterRunner());
            Assertions.assertTrue(execRemoteInterpreterProcess.getEnv().size() >= 2);
            Assertions.assertEquals(this.sparkHome, execRemoteInterpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertTrue(CollectionUtils.isEqualCollection(Arrays.asList(("--conf|spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + "|--conf|spark.files=" + "file_1" + "|--conf|spark.jars=" + "jar_1" + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-client").split("\\|")), Arrays.asList(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            if (execRemoteInterpreterProcess != null) {
                execRemoteInterpreterProcess.close();
            }
        } catch (Throwable th) {
            if (execRemoteInterpreterProcess != null) {
                try {
                    execRemoteInterpreterProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testYarnClientMode_2() throws IOException {
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(this.zConf, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "client");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        ExecRemoteInterpreterProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assertions.assertTrue(launch instanceof ExecRemoteInterpreterProcess);
        ExecRemoteInterpreterProcess execRemoteInterpreterProcess = launch;
        try {
            Assertions.assertEquals("spark", execRemoteInterpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue(execRemoteInterpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue(execRemoteInterpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals(this.zConf.getInterpreterRemoteRunnerPath(), execRemoteInterpreterProcess.getInterpreterRunner());
            Assertions.assertTrue(execRemoteInterpreterProcess.getEnv().size() >= 2);
            Assertions.assertEquals(this.sparkHome, execRemoteInterpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertTrue(CollectionUtils.isEqualCollection(Arrays.asList(("--conf|spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + "|--conf|spark.files=" + "file_1" + "|--conf|spark.jars=" + "jar_1" + "|--conf|spark.submit.deployMode=client|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn").split("\\|")), Arrays.asList(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            if (execRemoteInterpreterProcess != null) {
                execRemoteInterpreterProcess.close();
            }
        } catch (Throwable th) {
            if (execRemoteInterpreterProcess != null) {
                try {
                    execRemoteInterpreterProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testYarnClusterMode_1() throws IOException {
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(this.zConf, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn-cluster");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        ExecRemoteInterpreterProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assertions.assertTrue(launch instanceof ExecRemoteInterpreterProcess);
        ExecRemoteInterpreterProcess execRemoteInterpreterProcess = launch;
        try {
            Assertions.assertEquals("spark", execRemoteInterpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue(execRemoteInterpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue(execRemoteInterpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals(this.zConf.getInterpreterRemoteRunnerPath(), execRemoteInterpreterProcess.getInterpreterRunner());
            Assertions.assertTrue(execRemoteInterpreterProcess.getEnv().size() >= 3);
            Assertions.assertEquals(this.sparkHome, execRemoteInterpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertEquals("true", execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
            Assertions.assertTrue(CollectionUtils.isEqualCollection(Arrays.asList(("--conf|spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + "|--conf|spark.yarn.maxAppAttempts=1|--conf|spark.files=" + ("file_1," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties") + "|--conf|spark.jars=" + ("jar_1," + this.zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar") + "|--conf|spark.yarn.isPython=true|--conf|spark.yarn.submit.waitAppCompletion=false|--conf|spark.app.name=intpGroupId|--conf|spark.master=yarn-cluster").split("\\|")), Arrays.asList(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            if (execRemoteInterpreterProcess != null) {
                execRemoteInterpreterProcess.close();
            }
        } catch (Throwable th) {
            if (execRemoteInterpreterProcess != null) {
                try {
                    execRemoteInterpreterProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testYarnClusterMode_2() throws IOException {
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(this.zConf, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "cluster");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption interpreterOption = new InterpreterOption();
        interpreterOption.setUserImpersonate(true);
        InterpreterLaunchContext interpreterLaunchContext = new InterpreterLaunchContext(properties, interpreterOption, (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        Path path = Paths.get(this.zConf.getInterpreterLocalRepoPath(), interpreterLaunchContext.getInterpreterSettingId());
        FileUtils.deleteDirectory(path.toFile());
        Files.createDirectories(path, new FileAttribute[0]);
        Files.createFile(Paths.get(path.toAbsolutePath().toString(), "test.jar"), new FileAttribute[0]);
        ExecRemoteInterpreterProcess launch = sparkInterpreterLauncher.launch(interpreterLaunchContext);
        Assertions.assertTrue(launch instanceof ExecRemoteInterpreterProcess);
        ExecRemoteInterpreterProcess execRemoteInterpreterProcess = launch;
        try {
            Assertions.assertEquals("spark", execRemoteInterpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue(execRemoteInterpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue(execRemoteInterpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals(this.zConf.getInterpreterRemoteRunnerPath(), execRemoteInterpreterProcess.getInterpreterRunner());
            Assertions.assertTrue(execRemoteInterpreterProcess.getEnv().size() >= 3);
            Assertions.assertEquals(this.sparkHome, execRemoteInterpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertEquals("true", execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
            Assertions.assertTrue(CollectionUtils.isEqualCollection(Arrays.asList(("--proxy-user|user1|--conf|spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.yarn.maxAppAttempts=1|--conf|spark.master=yarn|--conf|spark.files=" + ("file_1," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties") + "|--conf|spark.jars=" + ("jar_1," + Paths.get(path.toAbsolutePath().toString(), "test.jar").toString() + "," + this.zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar") + "|--conf|spark.submit.deployMode=cluster|--conf|spark.yarn.submit.waitAppCompletion=false").split("\\|")), Arrays.asList(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            Assertions.assertTrue(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).startsWith("--proxy-user|user1"));
            if (execRemoteInterpreterProcess != null) {
                execRemoteInterpreterProcess.close();
            }
            Files.deleteIfExists(Paths.get(path.toAbsolutePath().toString(), "test.jar"));
            FileUtils.deleteDirectory(path.toFile());
        } catch (Throwable th) {
            if (execRemoteInterpreterProcess != null) {
                try {
                    execRemoteInterpreterProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void testYarnClusterMode_3() throws IOException {
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(this.zConf, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "cluster");
        properties.setProperty("spark.files", "{}");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption interpreterOption = new InterpreterOption();
        interpreterOption.setUserImpersonate(true);
        InterpreterLaunchContext interpreterLaunchContext = new InterpreterLaunchContext(properties, interpreterOption, (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        Path path = Paths.get(this.zConf.getInterpreterLocalRepoPath(), interpreterLaunchContext.getInterpreterSettingId());
        FileUtils.deleteDirectory(path.toFile());
        Files.createDirectories(path, new FileAttribute[0]);
        ExecRemoteInterpreterProcess launch = sparkInterpreterLauncher.launch(interpreterLaunchContext);
        Assertions.assertTrue(launch instanceof ExecRemoteInterpreterProcess);
        ExecRemoteInterpreterProcess execRemoteInterpreterProcess = launch;
        try {
            Assertions.assertEquals("spark", execRemoteInterpreterProcess.getInterpreterSettingName());
            Assertions.assertTrue(execRemoteInterpreterProcess.getInterpreterDir().endsWith("/interpreter/spark"));
            Assertions.assertTrue(execRemoteInterpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
            Assertions.assertEquals(this.zConf.getInterpreterRemoteRunnerPath(), execRemoteInterpreterProcess.getInterpreterRunner());
            Assertions.assertTrue(execRemoteInterpreterProcess.getEnv().size() >= 3);
            Assertions.assertEquals(this.sparkHome, execRemoteInterpreterProcess.getEnv().get("SPARK_HOME"));
            Assertions.assertEquals("true", execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
            Assertions.assertTrue(CollectionUtils.isEqualCollection(Arrays.asList(("--proxy-user|user1|--conf|spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + "|--conf|spark.yarn.isPython=true|--conf|spark.app.name=intpGroupId|--conf|spark.yarn.maxAppAttempts=1|--conf|spark.master=yarn|--conf|spark.files=" + ("{}," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties") + "|--conf|spark.jars=" + ("jar_1," + this.zeppelinHome + "/interpreter/spark/scala-2.12/spark-scala-2.12-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar") + "|--conf|spark.submit.deployMode=cluster|--conf|spark.yarn.submit.waitAppCompletion=false").split("\\|")), Arrays.asList(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).split("\\|"))));
            Assertions.assertTrue(((String) execRemoteInterpreterProcess.getEnv().get("ZEPPELIN_SPARK_CONF")).startsWith("--proxy-user|user1"));
            if (execRemoteInterpreterProcess != null) {
                execRemoteInterpreterProcess.close();
            }
            FileUtils.deleteDirectory(path.toFile());
        } catch (Throwable th) {
            if (execRemoteInterpreterProcess != null) {
                try {
                    execRemoteInterpreterProcess.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }
}
