package org.apache.zeppelin.interpreter.launcher;

import com.google.common.collect.Lists;
import com.google.common.io.Files;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.apache.zeppelin.conf.ZeppelinConfiguration;
import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/zeppelin/interpreter/launcher/YarnRemoteInterpreterProcess.class */
public class YarnRemoteInterpreterProcess extends RemoteInterpreterProcess {
    private String host;
    private int port;
    private ZeppelinConfiguration zConf;
    private final InterpreterLaunchContext launchContext;
    private final Properties properties;
    private final Map<String, String> envs;
    private AtomicBoolean isYarnAppRunning;
    private String errorMessage;
    private Configuration hadoopConf;
    private FileSystem fs;
    private FileSystem localFs;
    private YarnClient yarnClient;
    private ApplicationId appId;
    private Path stagingDir;
    private static Logger LOGGER = LoggerFactory.getLogger(YarnRemoteInterpreterProcess.class);
    private static final FsPermission APP_FILE_PERMISSION = FsPermission.createImmutable(Short.parseShort("644", 8));

    public YarnRemoteInterpreterProcess(InterpreterLaunchContext interpreterLaunchContext, Properties properties, Map<String, String> map, int i, int i2) {
        super(i, i2, interpreterLaunchContext.getIntpEventServerHost(), interpreterLaunchContext.getIntpEventServerPort());
        this.port = -1;
        this.isYarnAppRunning = new AtomicBoolean(false);
        this.zConf = ZeppelinConfiguration.create();
        this.launchContext = interpreterLaunchContext;
        this.properties = properties;
        this.envs = map;
        this.yarnClient = YarnClient.createYarnClient();
        this.hadoopConf = new YarnConfiguration();
        if (properties.containsKey("HADOOP_CONF_DIR") && !StringUtils.isBlank(properties.getProperty("HADOOP_CONF_DIR"))) {
            File file = new File(properties.getProperty("HADOOP_CONF_DIR"));
            if (!file.exists() || !file.isDirectory()) {
                throw new RuntimeException("HADOOP_CONF_DIR: " + file.getAbsolutePath() + " doesn't exist or is not a directory");
            }
            File file2 = new File(file, "core-site.xml");
            try {
                this.hadoopConf.addResource(file2.toURI().toURL());
            } catch (MalformedURLException e) {
                LOGGER.warn("Fail to add core-site.xml: " + file2.getAbsolutePath(), e);
            }
            File file3 = new File(file, "yarn-site.xml");
            try {
                this.hadoopConf.addResource(file3.toURI().toURL());
            } catch (MalformedURLException e2) {
                LOGGER.warn("Fail to add yarn-site.xml: " + file3.getAbsolutePath(), e2);
            }
        }
        this.yarnClient.init(this.hadoopConf);
        this.yarnClient.start();
        try {
            this.fs = FileSystem.get(this.hadoopConf);
            this.localFs = FileSystem.getLocal(this.hadoopConf);
        } catch (IOException e3) {
            throw new RuntimeException("Fail to create FileSystem", e3);
        }
    }

    public void processStarted(int i, String str) {
        this.port = i;
        this.host = str;
    }

    public String getErrorMessage() {
        return this.errorMessage;
    }

    public String getInterpreterGroupId() {
        return this.launchContext.getInterpreterGroupId();
    }

    public String getInterpreterSettingName() {
        return this.launchContext.getInterpreterSettingName();
    }

    public void start(String str) throws IOException {
        try {
            try {
                LOGGER.info("Submitting zeppelin-interpreter app to yarn");
                YarnClientApplication createApplication = this.yarnClient.createApplication();
                this.appId = createApplication.getNewApplicationResponse().getApplicationId();
                this.yarnClient.submitApplication(createApplicationSubmissionContext(createApplication.getApplicationSubmissionContext()));
                long currentTimeMillis = System.currentTimeMillis();
                ApplicationReport applicationReport = getApplicationReport(this.appId);
                while (applicationReport.getYarnApplicationState() != YarnApplicationState.FAILED && applicationReport.getYarnApplicationState() != YarnApplicationState.FINISHED && applicationReport.getYarnApplicationState() != YarnApplicationState.KILLED && applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
                    LOGGER.info("Wait for zeppelin interpreter yarn app to be started");
                    Thread.sleep(2000L);
                    if (System.currentTimeMillis() - currentTimeMillis > getConnectTimeout()) {
                        this.yarnClient.killApplication(this.appId);
                        throw new IOException("Launching zeppelin interpreter in yarn is time out, kill it now");
                    }
                    applicationReport = getApplicationReport(this.appId);
                }
                if (applicationReport.getYarnApplicationState() != YarnApplicationState.RUNNING) {
                    this.errorMessage = applicationReport.getDiagnostics();
                    throw new Exception("Failed to submit application to YARN, applicationId=" + this.appId + ", diagnostics=" + applicationReport.getDiagnostics());
                }
                this.isYarnAppRunning.set(true);
                if (this.stagingDir != null) {
                    this.fs.delete(this.stagingDir, true);
                }
            } catch (Exception e) {
                LOGGER.error("Fail to launch yarn interpreter process", e);
                throw new IOException(e);
            }
        } catch (Throwable th) {
            if (this.stagingDir != null) {
                this.fs.delete(this.stagingDir, true);
            }
            throw th;
        }
    }

    private ApplicationReport getApplicationReport(ApplicationId applicationId) throws YarnException, IOException {
        ApplicationReport applicationReport = this.yarnClient.getApplicationReport(applicationId);
        if (applicationReport.getYarnApplicationState() == null) {
            throw new ApplicationNotFoundException("YARN reports no state for application " + applicationId);
        }
        return applicationReport;
    }

    private ApplicationSubmissionContext createApplicationSubmissionContext(ApplicationSubmissionContext applicationSubmissionContext) throws Exception {
        setResources(applicationSubmissionContext);
        setPriority(applicationSubmissionContext);
        setQueue(applicationSubmissionContext);
        applicationSubmissionContext.setApplicationId(this.appId);
        setApplicationName(applicationSubmissionContext);
        applicationSubmissionContext.setApplicationType("ZEPPELIN INTERPRETER");
        applicationSubmissionContext.setMaxAppAttempts(1);
        applicationSubmissionContext.setAMContainerSpec(setUpAMLaunchContext());
        applicationSubmissionContext.setCancelTokensWhenComplete(true);
        return applicationSubmissionContext;
    }

    private ContainerLaunchContext setUpAMLaunchContext() throws IOException {
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        this.stagingDir = new Path(this.fs.getHomeDirectory() + "/.zeppelinStaging", this.appId.toString());
        HashMap hashMap = new HashMap();
        File createInterpreterZip = createInterpreterZip();
        addResource(this.fs, copyFileToRemote(this.stagingDir, this.localFs.makeQualified(new Path(createInterpreterZip.toURI())), (short) 1), hashMap, LocalResourceType.ARCHIVE, "zeppelin");
        FileUtils.forceDelete(createInterpreterZip);
        if (this.launchContext.getInterpreterSettingGroup().equals("flink")) {
            File createFlinkZip = createFlinkZip();
            addResource(this.fs, copyFileToRemote(this.stagingDir, this.localFs.makeQualified(new Path(createFlinkZip.toURI())), (short) 1), hashMap, LocalResourceType.ARCHIVE, "flink");
            FileUtils.forceDelete(createFlinkZip);
            String property = this.launchContext.getProperties().getProperty("HIVE_CONF_DIR");
            if (!StringUtils.isBlank(property)) {
                addResource(this.fs, copyFileToRemote(this.stagingDir, this.localFs.makeQualified(new Path(createHiveConfZip(new File(property)).toURI())), (short) 1), hashMap, LocalResourceType.ARCHIVE, "hive_conf");
            }
        }
        containerLaunchContext.setLocalResources(hashMap);
        ArrayList arrayList = new ArrayList();
        arrayList.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/bin/interpreter.sh");
        arrayList.add("-d");
        arrayList.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/interpreter/" + this.launchContext.getInterpreterSettingGroup());
        arrayList.add("-c");
        arrayList.add(this.launchContext.getIntpEventServerHost());
        arrayList.add("-p");
        arrayList.add(this.launchContext.getIntpEventServerPort() + "");
        arrayList.add("-r");
        arrayList.add(this.zConf.getInterpreterPortRange() + "");
        arrayList.add("-i");
        arrayList.add(this.launchContext.getInterpreterGroupId());
        arrayList.add("-l");
        arrayList.add(ApplicationConstants.Environment.PWD.$() + "/zeppelin/" + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_LOCALREPO.getStringValue() + "/" + this.launchContext.getInterpreterSettingName());
        arrayList.add("-g");
        arrayList.add(this.launchContext.getInterpreterSettingName());
        arrayList.add("1><LOG_DIR>" + File.separator + "stdout");
        arrayList.add("2><LOG_DIR>" + File.separator + "stderr");
        containerLaunchContext.setCommands(arrayList);
        populateHadoopClasspath(this.envs);
        if (this.launchContext.getInterpreterSettingGroup().equals("flink")) {
            this.envs.put("FLINK_HOME", ApplicationConstants.Environment.PWD.$() + "/flink");
            this.envs.put("FLINK_CONF_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/conf");
            this.envs.put("FLINK_LIB_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/lib");
            this.envs.put("FLINK_PLUGINS_DIR", ApplicationConstants.Environment.PWD.$() + "/flink/plugins");
            this.envs.put("HIVE_CONF_DIR", ApplicationConstants.Environment.PWD.$() + "/hive_conf");
        }
        this.envs.put("ZEPPELIN_INTP_MEM", "-Xmx" + Integer.parseInt(this.properties.getProperty("zeppelin.interpreter.yarn.resource.memory", "1024")) + "m");
        containerLaunchContext.setEnvironment(this.envs);
        return containerLaunchContext;
    }

    private void populateHadoopClasspath(Map<String, String> map) {
        ArrayList<String> newArrayList = Lists.newArrayList(getYarnAppClasspath());
        newArrayList.addAll(Lists.newArrayList(getMRAppClasspath()));
        LOGGER.info("Adding hadoop classpath: " + StringUtils.join(newArrayList, ":"));
        for (String str : newArrayList) {
            if (map.containsKey(ApplicationConstants.Environment.CLASSPATH.name())) {
                str = map.get(ApplicationConstants.Environment.CLASSPATH.name()) + "<CPS>" + str;
            }
            map.put(ApplicationConstants.Environment.CLASSPATH.name(), str);
        }
        this.envs.put("HADOOP_MAPRED_HOME", "${HADOOP_HOME}");
    }

    private String[] getYarnAppClasspath() {
        String[] strings = this.hadoopConf.getStrings("yarn.application.classpath");
        return (strings == null || strings.length == 0) ? getDefaultYarnApplicationClasspath() : strings;
    }

    private String[] getMRAppClasspath() {
        String[] strings = this.hadoopConf.getStrings("mapreduce.application.classpath");
        return (strings == null || strings.length == 0) ? getDefaultMRApplicationClasspath() : strings;
    }

    private String[] getDefaultYarnApplicationClasspath() {
        return YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH;
    }

    private String[] getDefaultMRApplicationClasspath() {
        return org.apache.hadoop.util.StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH);
    }

    private void setResources(ApplicationSubmissionContext applicationSubmissionContext) {
        int parseInt = Integer.parseInt(this.properties.getProperty("zeppelin.interpreter.yarn.resource.memory", "1024"));
        int parseInt2 = Integer.parseInt(this.properties.getProperty("zeppelin.interpreter.yarn.resource.memoryOverhead", "384"));
        if (parseInt2 < parseInt * 0.1d) {
            parseInt2 = 384;
        }
        applicationSubmissionContext.setResource(Resource.newInstance(parseInt + parseInt2, Integer.parseInt(this.properties.getProperty("zeppelin.interpreter.yarn.resource.cores", "1"))));
    }

    private void setPriority(ApplicationSubmissionContext applicationSubmissionContext) {
        Priority priority = (Priority) Records.newRecord(Priority.class);
        priority.setPriority(1);
        applicationSubmissionContext.setPriority(priority);
    }

    private void setQueue(ApplicationSubmissionContext applicationSubmissionContext) {
        applicationSubmissionContext.setQueue(this.properties.getProperty("zeppelin.interpreter.yarn.queue", "default"));
    }

    private void setApplicationName(ApplicationSubmissionContext applicationSubmissionContext) {
        applicationSubmissionContext.setApplicationName("Zeppelin Interpreter " + this.launchContext.getInterpreterGroupId());
    }

    private void addFileToZipStream(ZipOutputStream zipOutputStream, File file, String str) throws IOException {
        if (file == null || !file.exists()) {
            return;
        }
        String name = file.getName();
        if (str != null && !str.isEmpty()) {
            name = str + "/" + file.getName();
        }
        if (!file.isDirectory()) {
            zipOutputStream.putNextEntry(new ZipEntry(name));
            Files.copy(file, zipOutputStream);
            zipOutputStream.closeEntry();
        } else {
            for (File file2 : file.listFiles()) {
                addFileToZipStream(zipOutputStream, file2, name);
            }
        }
    }

    private File createInterpreterZip() throws IOException {
        File createTempFile = File.createTempFile("zeppelin_interpreter_", ".zip", Files.createTempDir());
        ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(createTempFile));
        zipOutputStream.setLevel(0);
        String str = System.getenv("ZEPPELIN_HOME");
        if (StringUtils.isBlank(str)) {
            throw new IOException("ZEPPELIN_HOME is not specified");
        }
        File file = new File(str);
        addFileToZipStream(zipOutputStream, new File(file, "bin"), null);
        addFileToZipStream(zipOutputStream, new File(file, "conf"), null);
        addFileToZipStream(zipOutputStream, new File(file, "interpreter/" + this.launchContext.getInterpreterSettingGroup()), "interpreter");
        File file2 = new File(this.zConf.getInterpreterLocalRepoPath() + "/" + this.launchContext.getInterpreterSettingName());
        if (file2.exists() && file2.isDirectory()) {
            LOGGER.debug("Adding localRepoDir {} to interpreter zip: ", file2.getAbsolutePath());
            addFileToZipStream(zipOutputStream, file2, "local-repo");
        }
        File[] listFiles = new File(file, "interpreter").listFiles(file3 -> {
            return file3.getName().startsWith("zeppelin-interpreter-shaded") && file3.getName().endsWith(".jar");
        });
        if (listFiles.length == 0) {
            throw new IOException("No zeppelin-interpreter-shaded jar found under " + file.getAbsolutePath() + "/interpreter");
        }
        if (listFiles.length > 1) {
            throw new IOException("More than 1 zeppelin-interpreter-shaded jars found under " + file.getAbsolutePath() + "/interpreter");
        }
        addFileToZipStream(zipOutputStream, listFiles[0], "interpreter");
        zipOutputStream.flush();
        zipOutputStream.close();
        return createTempFile;
    }

    private File createFlinkZip() throws IOException {
        File createTempFile = File.createTempFile("flink_", ".zip", Files.createTempDir());
        ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(createTempFile));
        zipOutputStream.setLevel(0);
        File file = new File(this.envs.get("FLINK_HOME"));
        if (!file.exists() || !file.isDirectory()) {
            throw new IOException("FLINK_HOME " + file.getAbsolutePath() + " doesn't exist or is not a directory.");
        }
        for (File file2 : file.listFiles()) {
            addFileToZipStream(zipOutputStream, file2, null);
        }
        zipOutputStream.flush();
        zipOutputStream.close();
        return createTempFile;
    }

    private File createHiveConfZip(File file) throws IOException {
        File createTempFile = File.createTempFile("hive_conf", ".zip", Files.createTempDir());
        ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(createTempFile));
        zipOutputStream.setLevel(0);
        if (!file.exists()) {
            throw new IOException("HIVE_CONF_DIR " + file.getAbsolutePath() + " doesn't exist");
        }
        for (File file2 : file.listFiles()) {
            addFileToZipStream(zipOutputStream, file2, null);
        }
        zipOutputStream.flush();
        zipOutputStream.close();
        return createTempFile;
    }

    private Path copyFileToRemote(Path path, Path path2, Short sh) throws IOException {
        FileSystem fileSystem = path.getFileSystem(this.hadoopConf);
        FileSystem fileSystem2 = path2.getFileSystem(this.hadoopConf);
        Path path3 = new Path(path, path2.getName());
        LOGGER.info("Uploading resource " + path2 + " to " + path3);
        FileUtil.copy(fileSystem2, path2, fileSystem, path3, false, this.hadoopConf);
        fileSystem.setReplication(path3, sh.shortValue());
        fileSystem.setPermission(path3, APP_FILE_PERMISSION);
        return path3;
    }

    private void addResource(FileSystem fileSystem, Path path, Map<String, LocalResource> map, LocalResourceType localResourceType, String str) throws IOException {
        FileStatus fileStatus = fileSystem.getFileStatus(path);
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        localResource.setType(localResourceType);
        localResource.setVisibility(LocalResourceVisibility.PUBLIC);
        localResource.setResource(ConverterUtils.getYarnUrlFromPath(path));
        localResource.setTimestamp(fileStatus.getModificationTime());
        localResource.setSize(fileStatus.getLen());
        map.put(str, localResource);
    }

    public void stop() {
        if (isRunning()) {
            LOGGER.info("Kill interpreter process");
            try {
                callRemoteFunction(client -> {
                    client.shutdown();
                    return null;
                });
            } catch (Exception e) {
                LOGGER.warn("ignore the exception when shutting down", e);
            }
            shutdown();
        }
        this.yarnClient.stop();
        LOGGER.info("Remote process terminated");
    }

    public String getHost() {
        return this.host;
    }

    public int getPort() {
        return this.port;
    }

    public boolean isRunning() {
        return this.isYarnAppRunning.get();
    }
}
