package co.cask.cdap.app.runtime.spark.distributed;

import co.cask.cdap.app.runtime.spark.SparkRuntimeContextProvider;
import co.cask.cdap.app.runtime.spark.SparkRuntimeUtils;
import co.cask.cdap.app.runtime.spark.classloader.SparkContainerClassLoader;
import co.cask.cdap.app.runtime.spark.python.SparkPythonUtil;
import co.cask.cdap.common.app.MainClassLoader;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.FilterClassLoader;
import co.cask.cdap.common.logging.StandardOutErrorRedirector;
import co.cask.cdap.common.logging.common.UncaughtExceptionHandler;
import java.io.IOException;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Enumeration;
import java.util.LinkedHashSet;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/distributed/SparkContainerLauncher.class */
public final class SparkContainerLauncher {
    private static final Logger LOG = LoggerFactory.getLogger(SparkContainerLauncher.class);
    private static final FilterClassLoader.Filter KAFKA_FILTER = new FilterClassLoader.Filter() { // from class: co.cask.cdap.app.runtime.spark.distributed.SparkContainerLauncher.1
        public boolean acceptResource(String str) {
            return str.startsWith("kafka/");
        }

        public boolean acceptPackage(String str) {
            return str.equals("kafka") || str.startsWith("kafka.");
        }
    };

    public static void launch(String str, String[] strArr) throws Exception {
        Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler());
        ClassLoader systemClassLoader = ClassLoader.getSystemClassLoader();
        Set set = (Set) ClassLoaders.getClassLoaderURLs(systemClassLoader, new LinkedHashSet());
        set.remove(getURLByClass(systemClassLoader, str));
        removeFirstJar(systemClassLoader, "scala.language", set);
        removeFirstJar(systemClassLoader, "net.jpountz.lz4.LZ4BlockInputStream", set);
        MainClassLoader sparkContainerClassLoader = new SparkContainerClassLoader((URL[]) set.toArray(new URL[set.size()]), new FilterClassLoader(systemClassLoader, KAFKA_FILTER));
        Thread.currentThread().setContextClassLoader(sparkContainerClassLoader);
        try {
            sparkContainerClassLoader.loadClass(SLF4JBridgeHandler.class.getName()).getDeclaredMethod("install", new Class[0]).invoke(null, new Object[0]);
        } catch (Exception e) {
            LOG.warn("Failed to invoke SLF4JBridgeHandler.install() required for jul-to-slf4j bridge", e);
        }
        try {
            sparkContainerClassLoader.loadClass(SparkRuntimeContextProvider.class.getName()).getMethod("get", new Class[0]).invoke(null, new Object[0]);
            if (!isPySpark()) {
                sparkContainerClassLoader.loadClass(StandardOutErrorRedirector.class.getName()).getDeclaredMethod("redirectToLogger", String.class).invoke(null, str);
            }
            if (System.getProperty("spark.executorEnv.CDAP_LOG_DIR") != null) {
                System.setProperty("spark.executorEnv.CDAP_LOG_DIR", "<LOG_DIR>");
            }
            Runnable startGatewayServerIfNeeded = startGatewayServerIfNeeded(sparkContainerClassLoader);
            try {
                LOG.info("Launch main class {}.main({})", str, Arrays.toString(strArr));
                sparkContainerClassLoader.loadClass(str).getMethod("main", String[].class).invoke(null, strArr);
                LOG.info("Main method returned {}", str);
                startGatewayServerIfNeeded.run();
            } catch (Throwable th) {
                startGatewayServerIfNeeded.run();
                throw th;
            }
        } catch (Throwable th2) {
            LOG.error("Exception raised when calling {}.main(String[]) method", str, th2);
            throw th2;
        }
    }

    private static URL getURLByClass(ClassLoader classLoader, String str) {
        URL resource = classLoader.getResource(str.replace('.', '/') + ".class");
        if (resource == null) {
            throw new IllegalStateException("Failed to find .class file resource for class " + str);
        }
        return ClassLoaders.getClassPathURL(str, resource);
    }

    private static Runnable startGatewayServerIfNeeded(ClassLoader classLoader) {
        Runnable runnable = new Runnable() { // from class: co.cask.cdap.app.runtime.spark.distributed.SparkContainerLauncher.2
            @Override // java.lang.Runnable
            public void run() {
            }
        };
        if (!isPySpark() || System.getenv(SparkRuntimeUtils.CDAP_SPARK_EXECUTION_SERVICE_URI) != null) {
            return runnable;
        }
        Path path = Paths.get("cdap.py4j.gateway.port.txt", new String[0]);
        try {
            final Object invoke = classLoader.loadClass(SparkPythonUtil.class.getName()).getMethod("startPy4jGateway", Path.class).invoke(null, path);
            LOG.info("Py4j GatewayServer started, listening at port {}", new String(Files.readAllBytes(path), StandardCharsets.UTF_8));
            return new Runnable() { // from class: co.cask.cdap.app.runtime.spark.distributed.SparkContainerLauncher.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        invoke.getClass().getMethod("shutdown", new Class[0]).invoke(invoke, new Object[0]);
                    } catch (Exception e) {
                        SparkContainerLauncher.LOG.warn("Failed to shutdown Py4j GatewayServer", e);
                    }
                }
            };
        } catch (Exception e) {
            LOG.warn("Failed to start Py4j GatewayServer. No CDAP functionality will be available in executor", e);
            return runnable;
        }
    }

    private static boolean isPySpark() {
        return System.getenv("PYTHONPATH") != null;
    }

    private static void removeFirstJar(ClassLoader classLoader, String str, Set<URL> set) throws IOException {
        URL uRLByClass = getURLByClass(classLoader, str);
        Enumeration<URL> resources = classLoader.getResources(str.replace('.', '/') + ".class");
        int i = 0;
        while (resources.hasMoreElements()) {
            resources.nextElement();
            i++;
        }
        if (i > 1) {
            LOG.info("Removing {}", uRLByClass);
            set.remove(uRLByClass);
        }
    }
}
