package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.spark.JavaSparkProgram;
import co.cask.cdap.api.spark.ScalaSparkProgram;
import co.cask.cdap.api.spark.SparkContext;
import com.google.common.base.Throwables;
import java.lang.reflect.InvocationTargetException;
import org.apache.spark.network.ConnectionManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkProgramWrapper.class */
public class SparkProgramWrapper {
    private static final Logger LOG = LoggerFactory.getLogger(SparkProgramWrapper.class);
    private static final int PROGRAM_WRAPPER_ARGUMENTS_SIZE = 1;
    private final String[] arguments;
    private final Class userProgramClass;
    private static SparkContext sparkContext;
    private static boolean scalaProgram;
    private static boolean sparkProgramSuccessful;
    private static boolean sparkProgramRunning;

    private SparkProgramWrapper(String[] strArr) {
        this.arguments = validateArgs(strArr);
        try {
            this.userProgramClass = Class.forName(this.arguments[0], true, Thread.currentThread().getContextClassLoader());
            setSparkContext();
        } catch (ClassNotFoundException e) {
            LOG.warn("Unable to find the program class: {}", this.arguments[0], e);
            throw Throwables.propagate(e);
        }
    }

    public static void main(String[] strArr) {
        new SparkProgramWrapper(strArr).instantiateUserProgramClass();
    }

    private String[] validateArgs(String[] strArr) {
        if (strArr.length < PROGRAM_WRAPPER_ARGUMENTS_SIZE) {
            throw new IllegalArgumentException("Insufficient number of arguments. Program class name followed by its arguments (if any) should be provided");
        }
        return strArr;
    }

    private void instantiateUserProgramClass() {
        try {
            runUserProgram(this.userProgramClass.newInstance());
        } catch (IllegalAccessException e) {
            LOG.warn("Illegal access to class: {}", this.arguments[0] + "or to its constructor", e);
            throw Throwables.propagate(e);
        } catch (InstantiationException e2) {
            LOG.warn("Unable to instantiate an object of program class: {}", this.arguments[0], e2);
            throw Throwables.propagate(e2);
        }
    }

    void setSparkContext() {
        if (JavaSparkProgram.class.isAssignableFrom(this.userProgramClass)) {
            sparkContext = new JavaSparkContext();
        } else {
            if (!ScalaSparkProgram.class.isAssignableFrom(this.userProgramClass)) {
                throw new IllegalArgumentException("Spark program must implement either JavaSparkProgram or ScalaSparkProgram");
            }
            sparkContext = new ScalaSparkContext();
            setScalaProgram(true);
        }
    }

    private void runUserProgram(Object obj) {
        try {
            this.userProgramClass.getMethod("run", SparkContext.class).invoke(obj, sparkContext);
        } catch (IllegalAccessException e) {
            LOG.warn("Unable to access run method in program class: {}", obj.getClass().getName(), e);
            throw Throwables.propagate(e);
        } catch (NoSuchMethodException e2) {
            LOG.warn("Unable to find run method in program class: {}", obj.getClass().getName(), e2);
            throw Throwables.propagate(e2);
        } catch (InvocationTargetException e3) {
            LOG.warn("Program class run method threw an exception", e3);
            throw Throwables.propagate(e3);
        }
    }

    public static void stopSparkProgram() {
        sparkContextStopBugFixer();
        if (isScalaProgram()) {
            ((org.apache.spark.SparkContext) getSparkContext().getOriginalSparkContext()).stop();
        } else {
            ((org.apache.spark.api.java.JavaSparkContext) getSparkContext().getOriginalSparkContext()).stop();
        }
    }

    private static void sparkContextStopBugFixer() {
        if (closeSelector(getConnectionManager(getSparkContext()))) {
            return;
        }
        LOG.warn("Failed to get the Selector which can cause thread deadlock in SparkContext.stop()");
    }

    /* JADX WARN: Code restructure failed: missing block: B:14:0x007c, code lost:
    
        return r5;
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private static boolean closeSelector(org.apache.spark.network.ConnectionManager r4) {
        /*
            r0 = 0
            r5 = r0
            r0 = r4
            java.lang.Class r0 = r0.getClass()
            java.lang.reflect.Field[] r0 = r0.getDeclaredFields()
            r6 = r0
            r0 = r6
            int r0 = r0.length
            r7 = r0
            r0 = 0
            r8 = r0
        L10:
            r0 = r8
            r1 = r7
            if (r0 >= r1) goto L7b
            r0 = r6
            r1 = r8
            r0 = r0[r1]
            r9 = r0
            java.lang.Class<java.nio.channels.Selector> r0 = java.nio.channels.Selector.class
            r1 = r9
            java.lang.Class r1 = r1.getType()
            boolean r0 = r0.isAssignableFrom(r1)
            if (r0 == 0) goto L75
            r0 = r9
            boolean r0 = r0.isAccessible()
            if (r0 != 0) goto L38
            r0 = r9
            r1 = 1
            r0.setAccessible(r1)
        L38:
            r0 = r9
            r1 = r4
            java.lang.Object r0 = r0.get(r1)     // Catch: java.lang.IllegalAccessException -> L4d java.io.IOException -> L61
            java.nio.channels.Selector r0 = (java.nio.channels.Selector) r0     // Catch: java.lang.IllegalAccessException -> L4d java.io.IOException -> L61
            r10 = r0
            r0 = r10
            r0.close()     // Catch: java.lang.IllegalAccessException -> L4d java.io.IOException -> L61
            r0 = 1
            r5 = r0
            goto L7b
        L4d:
            r10 = move-exception
            org.slf4j.Logger r0 = co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.LOG
            java.lang.String r1 = "Unable to access the selector field"
            r2 = r10
            r0.warn(r1, r2)
            r0 = r10
            java.lang.RuntimeException r0 = com.google.common.base.Throwables.propagate(r0)
            throw r0
        L61:
            r10 = move-exception
            org.slf4j.Logger r0 = co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.LOG
            java.lang.String r1 = "Close on Selector threw IOException"
            r2 = r10
            r0.info(r1, r2)
            r0 = r10
            java.lang.RuntimeException r0 = com.google.common.base.Throwables.propagate(r0)
            throw r0
        L75:
            int r8 = r8 + 1
            goto L10
        L7b:
            r0 = r5
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: co.cask.cdap.internal.app.runtime.spark.SparkProgramWrapper.closeSelector(org.apache.spark.network.ConnectionManager):boolean");
    }

    private static ConnectionManager getConnectionManager(SparkContext sparkContext2) {
        return isScalaProgram() ? ((org.apache.spark.SparkContext) sparkContext2.getOriginalSparkContext()).env().blockManager().connectionManager() : ((org.apache.spark.api.java.JavaSparkContext) sparkContext2.getOriginalSparkContext()).env().blockManager().connectionManager();
    }

    public static SparkContext getSparkContext() {
        return sparkContext;
    }

    public static boolean isSparkProgramRunning() {
        return sparkProgramRunning;
    }

    public static void setSparkProgramRunning(boolean z) {
        sparkProgramRunning = z;
    }

    public static boolean isSparkProgramSuccessful() {
        return sparkProgramSuccessful;
    }

    public static void setSparkProgramSuccessful(boolean z) {
        sparkProgramSuccessful = z;
    }

    private static boolean isScalaProgram() {
        return scalaProgram;
    }

    public static void setScalaProgram(boolean z) {
        scalaProgram = z;
    }
}
