package org.apache.zeppelin.interpreter.launcher;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.InterpreterOption;
import org.apache.zeppelin.interpreter.InterpreterRunner;
import org.apache.zeppelin.interpreter.integration.DownloadUtils;
import org.apache.zeppelin.interpreter.recovery.RecoveryStorage;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess;
import org.apache.zeppelin.util.Util;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.class */
public class SparkInterpreterLauncherTest {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) SparkInterpreterLauncher.class);
    private String sparkHome;
    private String zeppelinHome;

    @Before
    public void setUp() {
        for (ZeppelinConfiguration.ConfVars confVars : ZeppelinConfiguration.ConfVars.values()) {
            System.clearProperty(confVars.getVarName());
        }
        this.sparkHome = DownloadUtils.downloadSpark("2.3.2", "2.7");
        System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_HOME.getVarName(), new File("..").getAbsolutePath());
        this.zeppelinHome = ZeppelinConfiguration.create().getZeppelinHome();
        LOGGER.info("ZEPPELIN_HOME: " + this.zeppelinHome);
    }

    @Test
    public void testConnectTimeOut() throws IOException {
        ZeppelinConfiguration create = ZeppelinConfiguration.create();
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(create, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000");
        InterpreterOption interpreterOption = new InterpreterOption();
        interpreterOption.setUserImpersonate(true);
        RemoteInterpreterManagedProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, interpreterOption, (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"));
        Assert.assertTrue(launch instanceof RemoteInterpreterManagedProcess);
        RemoteInterpreterManagedProcess remoteInterpreterManagedProcess = launch;
        Assert.assertEquals("name", remoteInterpreterManagedProcess.getInterpreterSettingName());
        Assert.assertEquals(this.zeppelinHome + "/interpreter/groupName", remoteInterpreterManagedProcess.getInterpreterDir());
        Assert.assertEquals(this.zeppelinHome + "/local-repo/groupId", remoteInterpreterManagedProcess.getLocalRepoDir());
        Assert.assertEquals(10000L, remoteInterpreterManagedProcess.getConnectTimeout());
        Assert.assertEquals(create.getInterpreterRemoteRunnerPath(), remoteInterpreterManagedProcess.getInterpreterRunner());
        Assert.assertTrue(remoteInterpreterManagedProcess.getEnv().size() >= 2);
        Assert.assertEquals(true, Boolean.valueOf(remoteInterpreterManagedProcess.isUserImpersonated()));
    }

    @Test
    public void testLocalMode() throws IOException {
        ZeppelinConfiguration create = ZeppelinConfiguration.create();
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(create, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("ENV_1", "");
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "local[*]");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        RemoteInterpreterManagedProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assert.assertTrue(launch instanceof RemoteInterpreterManagedProcess);
        RemoteInterpreterManagedProcess remoteInterpreterManagedProcess = launch;
        Assert.assertEquals("spark", remoteInterpreterManagedProcess.getInterpreterSettingName());
        Assert.assertTrue(remoteInterpreterManagedProcess.getInterpreterDir().endsWith("/interpreter/spark"));
        Assert.assertTrue(remoteInterpreterManagedProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
        Assert.assertEquals(create.getInterpreterRemoteRunnerPath(), remoteInterpreterManagedProcess.getInterpreterRunner());
        Assert.assertTrue(remoteInterpreterManagedProcess.getEnv().size() >= 2);
        Assert.assertEquals(this.sparkHome, remoteInterpreterManagedProcess.getEnv().get("SPARK_HOME"));
        Assert.assertFalse(remoteInterpreterManagedProcess.getEnv().containsKey("ENV_1"));
        Assert.assertEquals(" --conf spark.files=file_1 --conf spark.jars=jar_1 --conf spark.app.name=intpGroupId --conf spark.master=local[*]", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
    }

    @Test
    public void testYarnClientMode_1() throws IOException {
        ZeppelinConfiguration create = ZeppelinConfiguration.create();
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(create, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn-client");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        RemoteInterpreterManagedProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assert.assertTrue(launch instanceof RemoteInterpreterManagedProcess);
        RemoteInterpreterManagedProcess remoteInterpreterManagedProcess = launch;
        Assert.assertEquals("spark", remoteInterpreterManagedProcess.getInterpreterSettingName());
        Assert.assertTrue(remoteInterpreterManagedProcess.getInterpreterDir().endsWith("/interpreter/spark"));
        Assert.assertTrue(remoteInterpreterManagedProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
        Assert.assertEquals(create.getInterpreterRemoteRunnerPath(), remoteInterpreterManagedProcess.getInterpreterRunner());
        Assert.assertTrue(remoteInterpreterManagedProcess.getEnv().size() >= 2);
        Assert.assertEquals(this.sparkHome, remoteInterpreterManagedProcess.getEnv().get("SPARK_HOME"));
        Assert.assertEquals(" --conf spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + " --conf spark.files=file_1 --conf spark.jars=jar_1 --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn-client", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
    }

    @Test
    public void testYarnClientMode_2() throws IOException {
        ZeppelinConfiguration create = ZeppelinConfiguration.create();
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(create, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "client");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        RemoteInterpreterManagedProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assert.assertTrue(launch instanceof RemoteInterpreterManagedProcess);
        RemoteInterpreterManagedProcess remoteInterpreterManagedProcess = launch;
        Assert.assertEquals("spark", remoteInterpreterManagedProcess.getInterpreterSettingName());
        Assert.assertTrue(remoteInterpreterManagedProcess.getInterpreterDir().endsWith("/interpreter/spark"));
        Assert.assertTrue(remoteInterpreterManagedProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
        Assert.assertEquals(create.getInterpreterRemoteRunnerPath(), remoteInterpreterManagedProcess.getInterpreterRunner());
        Assert.assertTrue(remoteInterpreterManagedProcess.getEnv().size() >= 2);
        Assert.assertEquals(this.sparkHome, remoteInterpreterManagedProcess.getEnv().get("SPARK_HOME"));
        Assert.assertEquals(" --conf spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + " --conf spark.files=file_1 --conf spark.jars=jar_1 --conf spark.submit.deployMode=client --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.master=yarn", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
    }

    @Test
    public void testYarnClusterMode_1() throws IOException {
        ZeppelinConfiguration create = ZeppelinConfiguration.create();
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(create, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn-cluster");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        RemoteInterpreterManagedProcess launch = sparkInterpreterLauncher.launch(new InterpreterLaunchContext(properties, new InterpreterOption(), (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"));
        Assert.assertTrue(launch instanceof RemoteInterpreterManagedProcess);
        RemoteInterpreterManagedProcess remoteInterpreterManagedProcess = launch;
        Assert.assertEquals("spark", remoteInterpreterManagedProcess.getInterpreterSettingName());
        Assert.assertTrue(remoteInterpreterManagedProcess.getInterpreterDir().endsWith("/interpreter/spark"));
        Assert.assertTrue(remoteInterpreterManagedProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
        Assert.assertEquals(create.getInterpreterRemoteRunnerPath(), remoteInterpreterManagedProcess.getInterpreterRunner());
        Assert.assertTrue(remoteInterpreterManagedProcess.getEnv().size() >= 3);
        Assert.assertEquals(this.sparkHome, remoteInterpreterManagedProcess.getEnv().get("SPARK_HOME"));
        Assert.assertEquals("true", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
        Assert.assertEquals(" --conf spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + " --conf spark.yarn.maxAppAttempts=1 --conf spark.files=" + ("file_1," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties") + " --conf spark.jars=" + ("jar_1," + this.zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar") + " --conf spark.yarn.isPython=true --conf spark.yarn.submit.waitAppCompletion=false --conf spark.app.name=intpGroupId --conf spark.master=yarn-cluster", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
    }

    @Test
    public void testYarnClusterMode_2() throws IOException {
        ZeppelinConfiguration create = ZeppelinConfiguration.create();
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(create, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "cluster");
        properties.setProperty("spark.files", "file_1");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption interpreterOption = new InterpreterOption();
        interpreterOption.setUserImpersonate(true);
        InterpreterLaunchContext interpreterLaunchContext = new InterpreterLaunchContext(properties, interpreterOption, (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        Path path = Paths.get(create.getInterpreterLocalRepoPath(), interpreterLaunchContext.getInterpreterSettingId());
        FileUtils.deleteDirectory(path.toFile());
        Files.createDirectories(path, new FileAttribute[0]);
        Files.createFile(Paths.get(path.toAbsolutePath().toString(), "test.jar"), new FileAttribute[0]);
        RemoteInterpreterManagedProcess launch = sparkInterpreterLauncher.launch(interpreterLaunchContext);
        Assert.assertTrue(launch instanceof RemoteInterpreterManagedProcess);
        RemoteInterpreterManagedProcess remoteInterpreterManagedProcess = launch;
        Assert.assertEquals("spark", remoteInterpreterManagedProcess.getInterpreterSettingName());
        Assert.assertTrue(remoteInterpreterManagedProcess.getInterpreterDir().endsWith("/interpreter/spark"));
        Assert.assertTrue(remoteInterpreterManagedProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
        Assert.assertEquals(create.getInterpreterRemoteRunnerPath(), remoteInterpreterManagedProcess.getInterpreterRunner());
        Assert.assertTrue(remoteInterpreterManagedProcess.getEnv().size() >= 3);
        Assert.assertEquals(this.sparkHome, remoteInterpreterManagedProcess.getEnv().get("SPARK_HOME"));
        Assert.assertEquals("true", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
        Assert.assertEquals(" --proxy-user user1 --conf spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.yarn.maxAppAttempts=1 --conf spark.master=yarn --conf spark.files=" + ("file_1," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties") + " --conf spark.jars=" + ("jar_1," + Paths.get(path.toAbsolutePath().toString(), "test.jar").toString() + "," + this.zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar") + " --conf spark.submit.deployMode=cluster --conf spark.yarn.submit.waitAppCompletion=false", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
        Files.deleteIfExists(Paths.get(path.toAbsolutePath().toString(), "test.jar"));
        FileUtils.deleteDirectory(path.toFile());
    }

    @Test
    public void testYarnClusterMode_3() throws IOException {
        ZeppelinConfiguration create = ZeppelinConfiguration.create();
        SparkInterpreterLauncher sparkInterpreterLauncher = new SparkInterpreterLauncher(create, (RecoveryStorage) null);
        Properties properties = new Properties();
        properties.setProperty("SPARK_HOME", this.sparkHome);
        properties.setProperty("property_1", "value_1");
        properties.setProperty("spark.master", "yarn");
        properties.setProperty("spark.submit.deployMode", "cluster");
        properties.setProperty("spark.files", "{}");
        properties.setProperty("spark.jars", "jar_1");
        InterpreterOption interpreterOption = new InterpreterOption();
        interpreterOption.setUserImpersonate(true);
        InterpreterLaunchContext interpreterLaunchContext = new InterpreterLaunchContext(properties, interpreterOption, (InterpreterRunner) null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host");
        Path path = Paths.get(create.getInterpreterLocalRepoPath(), interpreterLaunchContext.getInterpreterSettingId());
        FileUtils.deleteDirectory(path.toFile());
        Files.createDirectories(path, new FileAttribute[0]);
        RemoteInterpreterManagedProcess launch = sparkInterpreterLauncher.launch(interpreterLaunchContext);
        Assert.assertTrue(launch instanceof RemoteInterpreterManagedProcess);
        RemoteInterpreterManagedProcess remoteInterpreterManagedProcess = launch;
        Assert.assertEquals("spark", remoteInterpreterManagedProcess.getInterpreterSettingName());
        Assert.assertTrue(remoteInterpreterManagedProcess.getInterpreterDir().endsWith("/interpreter/spark"));
        Assert.assertTrue(remoteInterpreterManagedProcess.getLocalRepoDir().endsWith("/local-repo/groupId"));
        Assert.assertEquals(create.getInterpreterRemoteRunnerPath(), remoteInterpreterManagedProcess.getInterpreterRunner());
        Assert.assertTrue(remoteInterpreterManagedProcess.getEnv().size() >= 3);
        Assert.assertEquals(this.sparkHome, remoteInterpreterManagedProcess.getEnv().get("SPARK_HOME"));
        Assert.assertEquals("true", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_YARN_CLUSTER"));
        Assert.assertEquals(" --proxy-user user1 --conf spark.yarn.dist.archives=" + (this.sparkHome + "/R/lib/sparkr.zip#sparkr") + " --conf spark.yarn.isPython=true --conf spark.app.name=intpGroupId --conf spark.yarn.maxAppAttempts=1 --conf spark.master=yarn --conf spark.files=" + ("{}," + this.zeppelinHome + "/conf/log4j_yarn_cluster.properties") + " --conf spark.jars=" + ("jar_1," + this.zeppelinHome + "/interpreter/spark/scala-2.11/spark-scala-2.11-" + Util.getVersion() + ".jar," + this.zeppelinHome + "/interpreter/zeppelin-interpreter-shaded-" + Util.getVersion() + ".jar") + " --conf spark.submit.deployMode=cluster --conf spark.yarn.submit.waitAppCompletion=false", remoteInterpreterManagedProcess.getEnv().get("ZEPPELIN_SPARK_CONF"));
        FileUtils.deleteDirectory(path.toFile());
    }
}
