package co.cask.cdap.app.runtime.spark.classloader;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.spark.SparkPackageUtils;
import co.cask.cdap.common.lang.ClassRewriter;
import co.cask.cdap.common.logging.RedirectedPrintStream;
import co.cask.cdap.internal.asm.Classes;
import co.cask.cdap.internal.asm.Methods;
import co.cask.cdap.internal.asm.Signatures;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.UnmodifiableIterator;
import com.google.common.reflect.TypeToken;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.lang.reflect.Modifier;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.objectweb.asm.ClassReader;
import org.objectweb.asm.ClassVisitor;
import org.objectweb.asm.ClassWriter;
import org.objectweb.asm.Label;
import org.objectweb.asm.MethodVisitor;
import org.objectweb.asm.Type;
import org.objectweb.asm.commons.AdviceAdapter;
import org.objectweb.asm.commons.GeneratorAdapter;
import org.objectweb.asm.commons.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/classloader/SparkClassRewriter.class */
public class SparkClassRewriter implements ClassRewriter {
    private static final Logger LOG = LoggerFactory.getLogger(SparkClassRewriter.class);
    private static final Type SPARK_RUNTIME_ENV_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/SparkRuntimeEnv");
    private static final Type SPARK_RUNTIME_UTILS_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/SparkRuntimeUtils");
    private static final Type SPARK_CONTEXT_TYPE = Type.getObjectType("org/apache/spark/SparkContext");
    private static final Type SPARK_STREAMING_CONTEXT_TYPE = Type.getObjectType("org/apache/spark/streaming/StreamingContext");
    private static final Type SPARK_CONF_TYPE = Type.getObjectType("org/apache/spark/SparkConf");
    private static final Type SPARK_SUBMIT_TYPE = Type.getObjectType("org/apache/spark/deploy/SparkSubmit$");
    private static final Type SPARK_PYTHON_RUNNER_TYPE = Type.getObjectType("org/apache/spark/deploy/PythonRunner");
    private static final Type SPARK_PYTHON_RUNNER_COMPANION_TYPE = Type.getObjectType("org/apache/spark/deploy/PythonRunner$");
    private static final Type SPARK_PYTHON_WORKER_FACTORY_TYPE = Type.getObjectType("org/apache/spark/api/python/PythonWorkerFactory");
    private static final Type SPARK_PYTHON_WORKER_MONITOR_THREAD_TYPE = Type.getObjectType("org/apache/spark/api/python/PythonWorkerFactory$MonitorThread");
    private static final Type SPARK_REDIRECT_THREAD = Type.getObjectType("org/apache/spark/util/RedirectThread");
    private static final Type SPARK_YARN_CLIENT_TYPE = Type.getObjectType("org/apache/spark/deploy/yarn/Client");
    private static final Type SPARK_DSTREAM_GRAPH_TYPE = Type.getObjectType("org/apache/spark/streaming/DStreamGraph");
    private static final Type SPARK_BATCHED_WRITE_AHEAD_LOG_TYPE = Type.getObjectType("org/apache/spark/streaming/util/BatchedWriteAheadLog");
    private static final Type RATE_CONTROLLER_TYPE = Type.getObjectType("org/apache/spark/streaming/scheduler/RateController");
    private static final Type SPARK_EXECUTOR_CLASSLOADER_TYPE = Type.getObjectType("org/apache/spark/repl/ExecutorClassLoader");
    private static final Type YARN_SPARK_HADOOP_UTIL_TYPE = Type.getObjectType("org/apache/spark/deploy/yarn/YarnSparkHadoopUtil");
    private static final Type KRYO_TYPE = Type.getObjectType("com/esotericsoftware/kryo/Kryo");
    private static final Type SCHEMA_SERIALIZER_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/serializer/SchemaSerializer");
    private static final Type STRUCTURED_RECORD_SERIALIZER_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/serializer/StructuredRecordSerializer");
    private static final Type AKKA_REMOTING_TYPE = Type.getObjectType("akka/remote/Remoting");
    private static final Type EXECUTION_CONTEXT_TYPE = Type.getObjectType("scala/concurrent/ExecutionContext");
    private static final Type EXECUTION_CONTEXT_EXECUTOR_TYPE = Type.getObjectType("scala/concurrent/ExecutionContextExecutor");
    private static final String LOCALIZED_CONF_DIR = "__spark_conf__";
    private static final String SPARK_CONF_FILE = "__spark_conf__.properties";
    private final Function<String, URL> resourceLookup;
    private final boolean rewriteYarnClient;
    private final boolean distributed = Boolean.parseBoolean(System.getenv(SparkPackageUtils.SPARK_YARN_MODE));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/classloader/SparkClassRewriter$ConstructorRewriter.class */
    public abstract class ConstructorRewriter {
        private ConstructorRewriter() {
        }

        void onMethodEnter(String str, String str2, GeneratorAdapter generatorAdapter) {
        }

        void onMethodExit(String str, String str2, GeneratorAdapter generatorAdapter) {
        }
    }

    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/classloader/SparkClassRewriter$OutputRedirectMethodVisitor.class */
    private static final class OutputRedirectMethodVisitor extends MethodVisitor {
        private static final Type SYSTEM_TYPE = Type.getType(System.class);
        private static final Type PRINT_STREAM_TYPE = Type.getType(PrintStream.class);
        private static final Type STRING_TYPE = Type.getType(String.class);
        private final GeneratorAdapter adapter;
        private final boolean distributed;

        OutputRedirectMethodVisitor(MethodVisitor methodVisitor, int i, String str, String str2, boolean z) {
            super(327680, methodVisitor);
            this.adapter = new GeneratorAdapter(methodVisitor, i, str, str2);
            this.distributed = z;
        }

        public void visitFieldInsn(int i, String str, String str2, String str3) {
            if (i != 178 || !SYSTEM_TYPE.getInternalName().equals(str) || !PRINT_STREAM_TYPE.equals(Type.getType(str3)) || (!"out".equals(str2) && !"err".equals(str2))) {
                super.visitFieldInsn(i, str, str2, str3);
                return;
            }
            Type objectType = Type.getObjectType("co/cask/cdap/app/runtime/spark/SparkRuntimeContextProvider");
            Type objectType2 = Type.getObjectType("co/cask/cdap/app/runtime/spark/SparkRuntimeContext");
            Type type = Type.getType(LoggerFactory.class);
            Type type2 = Type.getType(Logger.class);
            Type type3 = Type.getType(RedirectedPrintStream.class);
            Type type4 = Type.getType(Program.class);
            this.adapter.invokeStatic(objectType, new Method("get", objectType2, new Type[0]));
            this.adapter.invokeVirtual(objectType2, new Method("getProgram", type4, new Type[0]));
            this.adapter.invokeInterface(type4, new Method("getMainClassName", STRING_TYPE, new Type[0]));
            this.adapter.invokeStatic(type, new Method("getLogger", type2, new Type[]{STRING_TYPE}));
            if (this.distributed) {
                this.adapter.getStatic(SYSTEM_TYPE, str2, PRINT_STREAM_TYPE);
            } else {
                this.adapter.push((Type) null);
            }
            this.adapter.invokeStatic(type3, new Method("createRedirectedOutStream", type3, new Type[]{type2, PRINT_STREAM_TYPE}));
        }
    }

    public SparkClassRewriter(Function<String, URL> function, boolean z) {
        this.resourceLookup = function;
        this.rewriteYarnClient = z;
    }

    @Nullable
    public byte[] rewriteClass(String str, InputStream inputStream) throws IOException {
        if (str.equals(SPARK_CONTEXT_TYPE.getClassName())) {
            return rewriteContext(SPARK_CONTEXT_TYPE, inputStream);
        }
        if (str.equals(SPARK_STREAMING_CONTEXT_TYPE.getClassName())) {
            return rewriteContext(SPARK_STREAMING_CONTEXT_TYPE, inputStream);
        }
        if (str.equals(SPARK_CONF_TYPE.getClassName())) {
            return rewriteSparkConf(SPARK_CONF_TYPE, inputStream);
        }
        if (str.startsWith(SPARK_SUBMIT_TYPE.getClassName())) {
            return rewriteSetProperties(inputStream);
        }
        if (str.equals(SPARK_PYTHON_RUNNER_TYPE.getClassName())) {
            return rewritePythonRunner(inputStream);
        }
        if (str.equals(SPARK_PYTHON_RUNNER_COMPANION_TYPE.getClassName())) {
            return rewritePythonRunnerCompanion(inputStream);
        }
        if (str.equals(SPARK_PYTHON_WORKER_FACTORY_TYPE.getClassName())) {
            return rewritePythonWorkerFactory(inputStream);
        }
        if (str.equals(SPARK_PYTHON_WORKER_MONITOR_THREAD_TYPE.getClassName())) {
            return rewritePythonWorkerMonitorThread(inputStream);
        }
        if (str.equals(SPARK_YARN_CLIENT_TYPE.getClassName()) && this.rewriteYarnClient) {
            return rewriteClient(inputStream);
        }
        if (str.equals(SPARK_DSTREAM_GRAPH_TYPE.getClassName())) {
            return rewriteDStreamGraph(inputStream);
        }
        if (str.equals(SPARK_BATCHED_WRITE_AHEAD_LOG_TYPE.getClassName())) {
            return rewriteBatchedWriteAheadLog(inputStream);
        }
        if (str.equals(RATE_CONTROLLER_TYPE.getClassName())) {
            return rewriteRateController(inputStream);
        }
        if (str.equals(SPARK_EXECUTOR_CLASSLOADER_TYPE.getClassName())) {
            return rewriteExecutorClassLoader(inputStream);
        }
        if (str.equals(AKKA_REMOTING_TYPE.getClassName())) {
            return rewriteAkkaRemoting(inputStream);
        }
        if (str.equals(YARN_SPARK_HADOOP_UTIL_TYPE.getClassName())) {
            return rewriteSparkHadoopUtil(str, inputStream);
        }
        if (str.equals(KRYO_TYPE.getClassName())) {
            return rewriteKryo(inputStream);
        }
        return null;
    }

    private byte[] rewriteContext(final Type type, InputStream inputStream) throws IOException {
        return rewriteConstructor(type, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            void onMethodEnter(String str, String str2, GeneratorAdapter generatorAdapter) {
                Type[] argumentTypes = Type.getArgumentTypes(str2);
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < argumentTypes.length; i++) {
                    if (SparkClassRewriter.SPARK_CONF_TYPE.equals(argumentTypes[i])) {
                        arrayList.add(Integer.valueOf(i));
                    }
                }
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    generatorAdapter.loadArg(((Integer) it.next()).intValue());
                    generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("setupSparkConf", Type.VOID_TYPE, new Type[]{SparkClassRewriter.SPARK_CONF_TYPE}));
                }
            }

            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            public void onMethodExit(String str, String str2, GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("setContext", Type.VOID_TYPE, new Type[]{type}));
            }
        });
    }

    private byte[] rewriteSparkConf(final Type type, InputStream inputStream) throws IOException {
        return rewriteConstructor(type, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.2
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            public void onMethodExit(String str, String str2, GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("setupSparkConf", Type.VOID_TYPE, new Type[]{type}));
            }
        });
    }

    private byte[] rewriteDStreamGraph(InputStream inputStream) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.3
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                return new MethodVisitor(327680, super.visitMethod(i, str, str2, str3, strArr)) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.3.1
                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        super.visitMethodInsn(i2, str4, str5, str6, z);
                        Type returnType = Type.getReturnType(str6);
                        if (i2 == 182 && str5.equals("par") && str4.equals("scala/collection/mutable/ArrayBuffer") && returnType.getClassName().equals("scala.collection.parallel.mutable.ParArray")) {
                            super.visitMethodInsn(184, SparkClassRewriter.SPARK_RUNTIME_UTILS_TYPE.getInternalName(), "setTaskSupport", Type.getMethodDescriptor(returnType, new Type[]{returnType}), false);
                        }
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewriteBatchedWriteAheadLog(InputStream inputStream) throws IOException {
        return rewriteConstructor(SPARK_BATCHED_WRITE_AHEAD_LOG_TYPE, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.4
            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            public void onMethodExit(String str, String str2, GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("addBatchedWriteAheadLog", Type.VOID_TYPE, new Type[]{Type.getType(Object.class)}));
            }
        });
    }

    private byte[] rewriteRateController(InputStream inputStream) throws IOException {
        return rewriteConstructor(RATE_CONTROLLER_TYPE, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.5
            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            void onMethodExit(String str, String str2, GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("addRateController", Type.VOID_TYPE, new Type[]{Type.getType(Object.class)}));
            }
        });
    }

    private byte[] rewriteExecutorClassLoader(InputStream inputStream) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(1);
        final Type type = Type.getType(ClassLoader.class);
        final Type objectType = Type.getObjectType("org/apache/spark/util/ParentClassLoader");
        final Method method = new Method("parentLoader", objectType, new Type[0]);
        final HashMap hashMap = new HashMap();
        hashMap.put(new Method("getResource", Type.getType(URL.class), new Type[]{Type.getType(String.class)}), null);
        Method method2 = new Method("getResources", Type.getType(Enumeration.class), new Type[]{Type.getType(String.class)});
        hashMap.put(method2, Signatures.getMethodSignature(method2, new TypeToken<Enumeration<URL>>() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.6
        }, new TypeToken[]{TypeToken.of(String.class)}));
        hashMap.put(new Method("getResourceAsStream", Type.getType(InputStream.class), new Type[]{Type.getType(String.class)}), null);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.7
            private boolean hasParentLoader;
            private boolean rewriteInit;

            public void visit(int i, int i2, String str, String str2, String str3, String[] strArr) {
                if (type.getInternalName().equals(str3)) {
                    this.rewriteInit = true;
                }
                super.visit(i, i2, str, str2, str3, strArr);
            }

            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str, str2, str3, strArr);
                Method method3 = new Method(str, str2);
                hashMap.remove(method3);
                this.hasParentLoader = this.hasParentLoader || method.equals(method3);
                return (this.rewriteInit && "<init>".equals(str)) ? new GeneratorAdapter(327680, visitMethod, i, str, str2) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.7.1
                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        if (i2 != 183 || !Type.getObjectType(str4).equals(type) || !str5.equals("<init>") || Type.getArgumentTypes(str6).length != 0 || !Type.getReturnType(str6).equals(Type.VOID_TYPE)) {
                            super.visitMethodInsn(i2, str4, str5, str6, z);
                        } else {
                            push((Type) null);
                            invokeConstructor(type, new Method("<init>", Type.VOID_TYPE, new Type[]{type}));
                        }
                    }
                } : visitMethod;
            }

            public void visitEnd() {
                if (!this.hasParentLoader) {
                    super.visitEnd();
                    return;
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    Method method3 = (Method) entry.getKey();
                    GeneratorAdapter generatorAdapter = new GeneratorAdapter(1, method3, super.visitMethod(1, method3.getName(), method3.getDescriptor(), (String) entry.getValue(), (String[]) null));
                    generatorAdapter.loadThis();
                    generatorAdapter.invokeVirtual(SparkClassRewriter.SPARK_EXECUTOR_CLASSLOADER_TYPE, method);
                    generatorAdapter.loadArg(0);
                    generatorAdapter.invokeVirtual(objectType, method3);
                    generatorAdapter.returnValue();
                    generatorAdapter.endMethod();
                }
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewriteKryo(InputStream inputStream) throws IOException {
        return rewriteConstructor(KRYO_TYPE, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.8
            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            public void onMethodExit(String str, String str2, GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.push(Type.getType(Schema.class));
                generatorAdapter.push(SparkClassRewriter.SCHEMA_SERIALIZER_TYPE);
                generatorAdapter.invokeVirtual(SparkClassRewriter.KRYO_TYPE, new Method("addDefaultSerializer", Type.VOID_TYPE, new Type[]{Type.getType(Class.class), Type.getType(Class.class)}));
                generatorAdapter.loadThis();
                generatorAdapter.push(Type.getType(StructuredRecord.class));
                generatorAdapter.push(SparkClassRewriter.STRUCTURED_RECORD_SERIALIZER_TYPE);
                generatorAdapter.invokeVirtual(SparkClassRewriter.KRYO_TYPE, new Method("addDefaultSerializer", Type.VOID_TYPE, new Type[]{Type.getType(Class.class), Type.getType(Class.class)}));
            }
        });
    }

    private byte[] rewriteConstructor(final Type type, InputStream inputStream, final ConstructorRewriter constructorRewriter) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.9
            public MethodVisitor visitMethod(int i, final String str, final String str2, String str3, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str, str2, str3, strArr);
                return !"<init>".equals(str) ? visitMethod : new AdviceAdapter(327680, visitMethod, i, str, str2) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.9.1
                    boolean calledThis;

                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        this.calledThis = this.calledThis || (i2 == 183 && Type.getObjectType(str4).equals(type) && str5.equals("<init>") && Type.getReturnType(str6).equals(Type.VOID_TYPE));
                        super.visitMethodInsn(i2, str4, str5, str6, z);
                    }

                    protected void onMethodEnter() {
                        if (this.calledThis) {
                            return;
                        }
                        constructorRewriter.onMethodEnter(str, str2, this);
                    }

                    protected void onMethodExit(int i2) {
                        if (!this.calledThis && i2 == 177) {
                            constructorRewriter.onMethodExit(str, str2, this);
                        }
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewriteSetProperties(InputStream inputStream) throws IOException {
        final Type type = Type.getType(System.class);
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.10
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                return new MethodVisitor(327680, super.visitMethod(i, str, str2, str3, strArr)) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.10.1
                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        if (i2 == 184 && str5.equals("setProperty") && str4.equals(type.getInternalName())) {
                            super.visitMethodInsn(i2, SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE.getInternalName(), str5, str6, false);
                        } else {
                            super.visitMethodInsn(i2, str4, str5, str6, z);
                        }
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewritePythonRunner(InputStream inputStream) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(1);
        final Method method = new Method("main", Type.VOID_TYPE, new Type[]{Type.getType(String[].class)});
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.11
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str, str2, str3, strArr);
                return (method.equals(new Method(str, str2)) && Modifier.isStatic(i)) ? new AdviceAdapter(327680, visitMethod, i, str, str2) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.11.1
                    final Type sparkUserAppExceptionType = Type.getObjectType("org/apache/spark/SparkUserAppException");
                    final Type cancellableType = Type.getObjectType("org/apache/twill/common/Cancellable");
                    final Label tryLabel = newLabel();
                    final Label tryEndLabel = newLabel();
                    final Label catchLabel = newLabel();
                    final Label finallyLabel = newLabel();
                    int cancellable;

                    protected void onMethodEnter() {
                        this.cancellable = newLocal(this.cancellableType);
                        invokeStatic(SparkClassRewriter.SPARK_RUNTIME_UTILS_TYPE, new Method("initSparkMain", this.cancellableType, new Type[0]));
                        storeLocal(this.cancellable);
                        visitTryCatchBlock(this.tryLabel, this.tryEndLabel, this.catchLabel, this.sparkUserAppExceptionType.getInternalName());
                        visitLabel(this.tryLabel);
                    }

                    protected void onMethodExit(int i2) {
                        visitLabel(this.tryEndLabel);
                        goTo(this.finallyLabel);
                        visitLabel(this.catchLabel);
                        int newLocal = newLocal(this.sparkUserAppExceptionType);
                        storeLocal(newLocal);
                        newInstance(Type.getType(RuntimeException.class));
                        dup();
                        loadLocal(newLocal);
                        invokeConstructor(Type.getType(RuntimeException.class), Methods.getMethod(Void.TYPE, "<init>", new Class[]{Throwable.class}));
                        throwException();
                        visitLabel(this.finallyLabel);
                        loadLocal(this.cancellable);
                        invokeInterface(this.cancellableType, new Method("cancel", Type.VOID_TYPE, new Type[0]));
                    }
                } : visitMethod;
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewritePythonRunnerCompanion(InputStream inputStream) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.12
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                return new OutputRedirectMethodVisitor(super.visitMethod(i, str, str2, str3, strArr), i, str, str2, SparkClassRewriter.this.distributed);
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewritePythonWorkerFactory(InputStream inputStream) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.13
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                OutputRedirectMethodVisitor outputRedirectMethodVisitor = new OutputRedirectMethodVisitor(super.visitMethod(i, str, str2, str3, strArr), i, str, str2, SparkClassRewriter.this.distributed);
                final GeneratorAdapter generatorAdapter = new GeneratorAdapter(outputRedirectMethodVisitor, i, str, str2);
                return new MethodVisitor(327680, outputRedirectMethodVisitor) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.13.1
                    public void visitFieldInsn(int i2, String str4, String str5, String str6) {
                        if (i2 == 181 && "pythonPath".equals(str5)) {
                            Type type = Type.getType(String.class);
                            Label newLabel = generatorAdapter.newLabel();
                            generatorAdapter.push("cdap.spark.pyFiles");
                            generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, Methods.getMethod(String.class, "getProperty", new Class[]{String.class}));
                            generatorAdapter.ifNull(newLabel);
                            generatorAdapter.push(File.pathSeparator);
                            generatorAdapter.invokeVirtual(type, Methods.getMethod(String.class, "concat", new Class[]{String.class}));
                            generatorAdapter.push("cdap.spark.pyFiles");
                            generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, Methods.getMethod(String.class, "getProperty", new Class[]{String.class}));
                            generatorAdapter.invokeVirtual(type, Methods.getMethod(String.class, "concat", new Class[]{String.class}));
                            generatorAdapter.mark(newLabel);
                        }
                        super.visitFieldInsn(i2, str4, str5, str6);
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    @Nullable
    private byte[] rewritePythonWorkerMonitorThread(InputStream inputStream) throws IOException {
        if (this.distributed) {
            return null;
        }
        return rewriteConstructor(SPARK_PYTHON_WORKER_MONITOR_THREAD_TYPE, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.14
            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            void onMethodEnter(String str, String str2, GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("addPyMonitorThread", Type.VOID_TYPE, new Type[]{Type.getType(Thread.class)}));
            }
        });
    }

    @Nullable
    private byte[] rewriteAkkaRemoting(InputStream inputStream) throws IOException {
        final Type determineAkkaDispatcherReturnType = determineAkkaDispatcherReturnType();
        if (determineAkkaDispatcherReturnType == null) {
            LOG.warn("Failed to determine ActorSystem.dispatcher() return type. No rewriting of akka.remote.Remoting class. ClassLoader leakage might happen in SDK.");
            return null;
        }
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.15
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str, str2, str3, strArr);
                return !"shutdown".equals(str) ? visitMethod : new MethodVisitor(327680, visitMethod) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.15.1
                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        if (i2 != 182 || !"global".equals(str5) || !"scala/concurrent/ExecutionContext$Implicits$".equals(str4) || !Type.getMethodDescriptor(SparkClassRewriter.EXECUTION_CONTEXT_EXECUTOR_TYPE, new Type[0]).equals(str6)) {
                            super.visitMethodInsn(i2, str4, str5, str6, z);
                            return;
                        }
                        super.visitInsn(87);
                        Type objectType = Type.getObjectType("akka/actor/ExtendedActorSystem");
                        super.visitVarInsn(25, 0);
                        super.visitMethodInsn(182, "akka/remote/Remoting", "system", Type.getMethodDescriptor(objectType, new Type[0]), false);
                        super.visitMethodInsn(182, objectType.getInternalName(), "dispatcher", Type.getMethodDescriptor(determineAkkaDispatcherReturnType, new Type[0]), false);
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    @Nullable
    private Type determineAkkaDispatcherReturnType() {
        URL url = (URL) this.resourceLookup.apply("akka/actor/ActorSystem.class");
        if (url == null) {
            return null;
        }
        try {
            InputStream openStream = url.openStream();
            Throwable th = null;
            try {
                try {
                    final AtomicReference atomicReference = new AtomicReference();
                    new ClassReader(openStream).accept(new ClassVisitor(327680) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.16
                        public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                            if (str.equals("dispatcher") && Type.getArgumentTypes(str2).length == 0) {
                                Type returnType = Type.getReturnType(str2);
                                if (returnType.equals(SparkClassRewriter.EXECUTION_CONTEXT_TYPE) || returnType.equals(SparkClassRewriter.EXECUTION_CONTEXT_EXECUTOR_TYPE)) {
                                    atomicReference.set(returnType);
                                } else {
                                    SparkClassRewriter.LOG.warn("Unsupported return type of ActorSystem.dispatcher(): {}", returnType.getClassName());
                                }
                            }
                            return super.visitMethod(i, str, str2, str3, strArr);
                        }
                    }, 7);
                    Type type = (Type) atomicReference.get();
                    if (openStream != null) {
                        if (0 != 0) {
                            try {
                                openStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openStream.close();
                        }
                    }
                    return type;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.warn("Failed to determine ActorSystem dispatcher() return type.", e);
            return null;
        }
    }

    @Nullable
    private byte[] rewriteClient(InputStream inputStream) throws IOException {
        boolean z = false;
        UnmodifiableIterator it = ImmutableList.of("HADOOP_CONF_DIR", "YARN_CONF_DIR").iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String str = System.getenv((String) it.next());
            if (str != null) {
                File file = new File(str);
                if (file.isDirectory() && file.listFiles() == null) {
                    z = true;
                    break;
                }
            }
        }
        if (!z) {
            return null;
        }
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(1);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.17
            public MethodVisitor visitMethod(int i, String str2, String str3, String str4, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str2, str3, str4, strArr);
                if (!"createConfArchive".equals(str2)) {
                    return visitMethod;
                }
                Type type = Type.getType(File.class);
                Type type2 = Type.getType(String.class);
                boolean equals = Type.getReturnType(str3).equals(type);
                Type objectType = Type.getObjectType("scala/Option");
                if (!equals && !Type.getReturnType(str3).equals(objectType)) {
                    return visitMethod;
                }
                GeneratorAdapter generatorAdapter = new GeneratorAdapter(visitMethod, i, str2, str3);
                generatorAdapter.loadThis();
                generatorAdapter.getField(Type.getObjectType("org/apache/spark/deploy/yarn/Client"), "sparkConf", SparkClassRewriter.SPARK_CONF_TYPE);
                generatorAdapter.visitLdcInsn(SparkClassRewriter.SPARK_CONF_FILE);
                generatorAdapter.visitLdcInsn("__spark_conf__");
                generatorAdapter.visitLdcInsn("__spark_conf__");
                generatorAdapter.visitLdcInsn(".zip");
                generatorAdapter.invokeStatic(type, new Method("createTempFile", type, new Type[]{type2, type2}));
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_UTILS_TYPE, new Method("createConfArchive", type, new Type[]{SparkClassRewriter.SPARK_CONF_TYPE, type2, type2, type}));
                if (equals) {
                    generatorAdapter.returnValue();
                    generatorAdapter.endMethod();
                    return null;
                }
                generatorAdapter.invokeStatic(objectType, new Method("apply", objectType, new Type[]{Type.getType(Object.class)}));
                generatorAdapter.checkCast(objectType);
                generatorAdapter.returnValue();
                generatorAdapter.endMethod();
                return null;
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewriteSparkHadoopUtil(String str, InputStream inputStream) throws IOException {
        return Classes.rewriteMethodToNoop(str, inputStream, ImmutableSet.of("obtainTokensForNamenodes", "obtainTokenForHBase"));
    }
}
