package org.apache.zeppelin.interpreter;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/interpreter/SparkIntegrationTest.class */
public abstract class SparkIntegrationTest {
    private static Logger LOGGER = LoggerFactory.getLogger(SparkIntegrationTest.class);
    private static MiniHadoopCluster hadoopCluster;
    private static MiniZeppelin zeppelin;
    private static InterpreterFactory interpreterFactory;
    private static InterpreterSettingManager interpreterSettingManager;
    private String sparkVersion;
    private String sparkHome;

    public SparkIntegrationTest(String str) {
        LOGGER.info("Testing SparkVersion: " + str);
        this.sparkVersion = str;
        this.sparkHome = SparkDownloadUtils.downloadSpark(str);
    }

    @BeforeClass
    public static void setUp() throws IOException {
        hadoopCluster = new MiniHadoopCluster();
        hadoopCluster.start();
        zeppelin = new MiniZeppelin();
        zeppelin.start();
        interpreterFactory = zeppelin.getInterpreterFactory();
        interpreterSettingManager = zeppelin.getInterpreterSettingManager();
    }

    @AfterClass
    public static void tearDown() throws IOException {
        if (zeppelin != null) {
            zeppelin.stop();
        }
        if (hadoopCluster != null) {
            hadoopCluster.stop();
        }
    }

    private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException {
        InterpreterSetting interpreterSettingByName = interpreterSettingManager.getInterpreterSettingByName("spark");
        interpreterSettingByName.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0");
        interpreterSettingByName.setProperty("spark.jars", new File("target/zeppelin-zengine-" + new MavenXpp3Reader().read(new FileReader("pom.xml")).getVersion() + ".jar").getAbsolutePath());
        interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
        Interpreter interpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
        InterpreterContext build = new InterpreterContext.Builder().setNoteId("note1").setParagraphId("paragraph_1").build();
        InterpreterResult interpret = interpreter.interpret("sc.version", build);
        Assert.assertEquals(InterpreterResult.Code.SUCCESS, interpret.code);
        String data = ((InterpreterResultMessage) interpret.message().get(0)).getData();
        Assert.assertTrue(data + " doesn't contain " + this.sparkVersion, data.contains(this.sparkVersion));
        InterpreterResult interpret2 = interpreter.interpret("sc.range(1,10).sum()", build);
        Assert.assertEquals(InterpreterResult.Code.SUCCESS, interpret2.code);
        Assert.assertTrue(((InterpreterResultMessage) interpret2.msg.get(0)).getData().contains("45"));
        Assert.assertEquals(InterpreterResult.Code.SUCCESS, interpreter.interpret("import org.apache.zeppelin.interpreter.install.InstallInterpreter\nimport com.maxmind.geoip2._", build).code());
        Assert.assertEquals(InterpreterResult.Code.SUCCESS, interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark").interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", build).code);
        InterpreterResult interpret3 = interpreterFactory.getInterpreter("user1", "note1", "spark.ipyspark").interpret("sqlContext.table('test').show()", build);
        Assert.assertEquals(interpret3.toString(), InterpreterResult.Code.SUCCESS, interpret3.code);
        InterpreterResult interpret4 = interpreterFactory.getInterpreter("user1", "note1", "spark.sql").interpret("select count(1) as c from test", build);
        Assert.assertEquals(InterpreterResult.Code.SUCCESS, interpret4.code);
        Assert.assertEquals(InterpreterResult.Type.TABLE, ((InterpreterResultMessage) interpret4.message().get(0)).getType());
        Assert.assertEquals("c\n2\n", ((InterpreterResultMessage) interpret4.message().get(0)).getData());
        Interpreter interpreter2 = interpreterFactory.getInterpreter("user1", "note1", "spark.r");
        InterpreterResult interpret5 = isSpark2() ? interpreter2.interpret("df <- as.DataFrame(faithful)\nhead(df)", build) : interpreter2.interpret("df <- createDataFrame(sqlContext, faithful)\nhead(df)", build);
        Assert.assertEquals(InterpreterResult.Code.SUCCESS, interpret5.code);
        Assert.assertEquals(InterpreterResult.Type.TEXT, ((InterpreterResultMessage) interpret5.message().get(0)).getType());
        Assert.assertTrue(((InterpreterResultMessage) interpret5.message().get(0)).getData().contains("eruptions waiting"));
    }

    @Test
    public void testLocalMode() throws IOException, YarnException, InterpreterException, InterruptedException, XmlPullParserException {
        InterpreterSetting interpreterSettingByName = interpreterSettingManager.getInterpreterSettingByName("spark");
        interpreterSettingByName.setProperty("master", "local[*]");
        interpreterSettingByName.setProperty("SPARK_HOME", this.sparkHome);
        interpreterSettingByName.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
        interpreterSettingByName.setProperty("zeppelin.spark.useHiveContext", "false");
        interpreterSettingByName.setProperty("zeppelin.pyspark.useIPython", "false");
        interpreterSettingByName.setProperty("spark.pyspark.python", getPythonExec());
        testInterpreterBasics();
        Assert.assertEquals(0L, hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING))).getApplicationList().size());
        interpreterSettingManager.close();
    }

    @Test
    public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
        InterpreterSetting interpreterSettingByName = interpreterSettingManager.getInterpreterSettingByName("spark");
        interpreterSettingByName.setProperty("master", "yarn-client");
        interpreterSettingByName.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
        interpreterSettingByName.setProperty("SPARK_HOME", this.sparkHome);
        interpreterSettingByName.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
        interpreterSettingByName.setProperty("zeppelin.spark.useHiveContext", "false");
        interpreterSettingByName.setProperty("zeppelin.pyspark.useIPython", "false");
        interpreterSettingByName.setProperty("spark.pyspark.python", getPythonExec());
        interpreterSettingByName.setProperty("spark.driver.memory", "512m");
        testInterpreterBasics();
        Assert.assertEquals(1L, hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING))).getApplicationList().size());
        interpreterSettingManager.close();
    }

    @Test
    public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
        InterpreterSetting interpreterSettingByName = interpreterSettingManager.getInterpreterSettingByName("spark");
        interpreterSettingByName.setProperty("master", "yarn-cluster");
        interpreterSettingByName.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
        interpreterSettingByName.setProperty("SPARK_HOME", this.sparkHome);
        interpreterSettingByName.setProperty("ZEPPELIN_CONF_DIR", zeppelin.getZeppelinConfDir().getAbsolutePath());
        interpreterSettingByName.setProperty("zeppelin.spark.useHiveContext", "false");
        interpreterSettingByName.setProperty("zeppelin.pyspark.useIPython", "false");
        interpreterSettingByName.setProperty("spark.pyspark.python", getPythonExec());
        interpreterSettingByName.setProperty("spark.driver.memory", "512m");
        testInterpreterBasics();
        Assert.assertEquals(1L, hadoopCluster.getYarnCluster().getResourceManager().getClientRMService().getApplications(GetApplicationsRequest.newInstance(EnumSet.of(YarnApplicationState.RUNNING))).getApplicationList().size());
        interpreterSettingManager.close();
    }

    private boolean isSpark2() {
        return this.sparkVersion.startsWith("2.");
    }

    private String getPythonExec() throws IOException, InterruptedException {
        Process exec = Runtime.getRuntime().exec(new String[]{"which", "python"});
        if (exec.waitFor() != 0) {
            throw new RuntimeException("Fail to run command: which python.");
        }
        return IOUtils.toString(exec.getInputStream()).trim();
    }
}
