package co.cask.cdap.app.runtime.spark.classloader;

import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.common.lang.ClassRewriter;
import co.cask.cdap.internal.asm.Classes;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nullable;
import org.objectweb.asm.ClassReader;
import org.objectweb.asm.ClassVisitor;
import org.objectweb.asm.ClassWriter;
import org.objectweb.asm.MethodVisitor;
import org.objectweb.asm.Type;
import org.objectweb.asm.commons.AdviceAdapter;
import org.objectweb.asm.commons.GeneratorAdapter;
import org.objectweb.asm.commons.Method;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/classloader/SparkClassRewriter.class */
public class SparkClassRewriter implements ClassRewriter {
    private static final Logger LOG = LoggerFactory.getLogger(SparkClassRewriter.class);
    private static final Type SPARK_RUNTIME_ENV_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/SparkRuntimeEnv");
    private static final Type SPARK_RUNTIME_UTILS_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/SparkRuntimeUtils");
    private static final Type SPARK_CONTEXT_TYPE = Type.getObjectType("org/apache/spark/SparkContext");
    private static final Type SPARK_STREAMING_CONTEXT_TYPE = Type.getObjectType("org/apache/spark/streaming/StreamingContext");
    private static final Type SPARK_CONF_TYPE = Type.getObjectType("org/apache/spark/SparkConf");
    private static final Type SPARK_SUBMIT_TYPE = Type.getObjectType("org/apache/spark/deploy/SparkSubmit$");
    private static final Type SPARK_YARN_CLIENT_TYPE = Type.getObjectType("org/apache/spark/deploy/yarn/Client");
    private static final Type SPARK_DSTREAM_GRAPH_TYPE = Type.getObjectType("org/apache/spark/streaming/DStreamGraph");
    private static final Type YARN_SPARK_HADOOP_UTIL_TYPE = Type.getObjectType("org/apache/spark/deploy/yarn/YarnSparkHadoopUtil");
    private static final Type KRYO_TYPE = Type.getObjectType("com/esotericsoftware/kryo/Kryo");
    private static final Type SCHEMA_SERIALIZER_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/serializer/SchemaSerializer");
    private static final Type STRUCTURED_RECORD_SERIALIZER_TYPE = Type.getObjectType("co/cask/cdap/app/runtime/spark/serializer/StructuredRecordSerializer");
    private static final Type AKKA_REMOTING_TYPE = Type.getObjectType("akka/remote/Remoting");
    private static final Type EXECUTION_CONTEXT_TYPE = Type.getObjectType("scala/concurrent/ExecutionContext");
    private static final Type EXECUTION_CONTEXT_EXECUTOR_TYPE = Type.getObjectType("scala/concurrent/ExecutionContextExecutor");
    private static final String LOCALIZED_CONF_DIR = "__spark_conf__";
    private static final String LOCALIZED_CONF_DIR_ZIP = "__spark_conf__.zip";
    private static final String SPARK_CONF_FILE = "__spark_conf__.properties";
    private final Function<String, URL> resourceLookup;
    private final boolean rewriteYarnClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/classloader/SparkClassRewriter$ConstructorRewriter.class */
    public interface ConstructorRewriter {
        void onMethodExit(GeneratorAdapter generatorAdapter);
    }

    public SparkClassRewriter(Function<String, URL> function, boolean z) {
        this.resourceLookup = function;
        this.rewriteYarnClient = z;
    }

    @Nullable
    public byte[] rewriteClass(String str, InputStream inputStream) throws IOException {
        if (str.equals(SPARK_CONTEXT_TYPE.getClassName())) {
            return rewriteContext(SPARK_CONTEXT_TYPE, inputStream);
        }
        if (str.equals(SPARK_STREAMING_CONTEXT_TYPE.getClassName())) {
            return rewriteContext(SPARK_STREAMING_CONTEXT_TYPE, inputStream);
        }
        if (str.equals(SPARK_CONF_TYPE.getClassName())) {
            return rewriteSparkConf(SPARK_CONF_TYPE, inputStream);
        }
        if (str.startsWith(SPARK_SUBMIT_TYPE.getClassName())) {
            return rewriteSetProperties(inputStream);
        }
        if (str.equals(SPARK_YARN_CLIENT_TYPE.getClassName()) && this.rewriteYarnClient) {
            return rewriteClient(inputStream);
        }
        if (str.equals(SPARK_DSTREAM_GRAPH_TYPE.getClassName())) {
            return rewriteDStreamGraph(inputStream);
        }
        if (str.equals(AKKA_REMOTING_TYPE.getClassName())) {
            return rewriteAkkaRemoting(inputStream);
        }
        if (str.equals(YARN_SPARK_HADOOP_UTIL_TYPE.getClassName())) {
            return rewriteSparkHadoopUtil(str, inputStream);
        }
        if (str.equals(KRYO_TYPE.getClassName())) {
            return rewriteKryo(inputStream);
        }
        return null;
    }

    private byte[] rewriteContext(final Type type, InputStream inputStream) throws IOException {
        return rewriteConstructor(type, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.1
            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            public void onMethodExit(GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("setContext", Type.VOID_TYPE, new Type[]{type}));
            }
        });
    }

    private byte[] rewriteSparkConf(final Type type, InputStream inputStream) throws IOException {
        return rewriteConstructor(type, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.2
            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            public void onMethodExit(GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE, new Method("setupSparkConf", Type.VOID_TYPE, new Type[]{type}));
            }
        });
    }

    private byte[] rewriteDStreamGraph(InputStream inputStream) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.3
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                return new MethodVisitor(327680, super.visitMethod(i, str, str2, str3, strArr)) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.3.1
                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        super.visitMethodInsn(i2, str4, str5, str6, z);
                        Type returnType = Type.getReturnType(str6);
                        if (i2 == 182 && str5.equals("par") && str4.equals("scala/collection/mutable/ArrayBuffer") && returnType.getClassName().equals("scala.collection.parallel.mutable.ParArray")) {
                            super.visitMethodInsn(184, SparkClassRewriter.SPARK_RUNTIME_UTILS_TYPE.getInternalName(), "setTaskSupport", Type.getMethodDescriptor(returnType, new Type[]{returnType}), false);
                        }
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewriteKryo(InputStream inputStream) throws IOException {
        return rewriteConstructor(KRYO_TYPE, inputStream, new ConstructorRewriter() { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.4
            @Override // co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.ConstructorRewriter
            public void onMethodExit(GeneratorAdapter generatorAdapter) {
                generatorAdapter.loadThis();
                generatorAdapter.push(Type.getType(Schema.class));
                generatorAdapter.push(SparkClassRewriter.SCHEMA_SERIALIZER_TYPE);
                generatorAdapter.invokeVirtual(SparkClassRewriter.KRYO_TYPE, new Method("addDefaultSerializer", Type.VOID_TYPE, new Type[]{Type.getType(Class.class), Type.getType(Class.class)}));
                generatorAdapter.loadThis();
                generatorAdapter.push(Type.getType(StructuredRecord.class));
                generatorAdapter.push(SparkClassRewriter.STRUCTURED_RECORD_SERIALIZER_TYPE);
                generatorAdapter.invokeVirtual(SparkClassRewriter.KRYO_TYPE, new Method("addDefaultSerializer", Type.VOID_TYPE, new Type[]{Type.getType(Class.class), Type.getType(Class.class)}));
            }
        });
    }

    private byte[] rewriteConstructor(final Type type, InputStream inputStream, final ConstructorRewriter constructorRewriter) throws IOException {
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.5
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str, str2, str3, strArr);
                return !"<init>".equals(str) ? visitMethod : new AdviceAdapter(327680, visitMethod, i, str, str2) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.5.1
                    boolean calledThis;

                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        this.calledThis = this.calledThis || (i2 == 183 && Type.getObjectType(str4).equals(type) && str5.equals("<init>") && Type.getReturnType(str6).equals(Type.VOID_TYPE));
                        super.visitMethodInsn(i2, str4, str5, str6, z);
                    }

                    protected void onMethodExit(int i2) {
                        if (!this.calledThis && i2 == 177) {
                            constructorRewriter.onMethodExit(this);
                        }
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewriteSetProperties(InputStream inputStream) throws IOException {
        final Type type = Type.getType(System.class);
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.6
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                return new MethodVisitor(327680, super.visitMethod(i, str, str2, str3, strArr)) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.6.1
                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        if (i2 == 184 && str5.equals("setProperty") && str4.equals(type.getInternalName())) {
                            super.visitMethodInsn(i2, SparkClassRewriter.SPARK_RUNTIME_ENV_TYPE.getInternalName(), str5, str6, false);
                        } else {
                            super.visitMethodInsn(i2, str4, str5, str6, z);
                        }
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    @Nullable
    private byte[] rewriteAkkaRemoting(InputStream inputStream) throws IOException {
        final Type determineAkkaDispatcherReturnType = determineAkkaDispatcherReturnType();
        if (determineAkkaDispatcherReturnType == null) {
            LOG.warn("Failed to determine ActorSystem.dispatcher() return type. No rewriting of akka.remote.Remoting class. ClassLoader leakage might happen in SDK.");
            return null;
        }
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(0);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.7
            public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str, str2, str3, strArr);
                return !"shutdown".equals(str) ? visitMethod : new MethodVisitor(327680, visitMethod) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.7.1
                    public void visitMethodInsn(int i2, String str4, String str5, String str6, boolean z) {
                        if (i2 != 182 || !"global".equals(str5) || !"scala/concurrent/ExecutionContext$Implicits$".equals(str4) || !Type.getMethodDescriptor(SparkClassRewriter.EXECUTION_CONTEXT_EXECUTOR_TYPE, new Type[0]).equals(str6)) {
                            super.visitMethodInsn(i2, str4, str5, str6, z);
                            return;
                        }
                        super.visitInsn(87);
                        Type objectType = Type.getObjectType("akka/actor/ExtendedActorSystem");
                        super.visitVarInsn(25, 0);
                        super.visitMethodInsn(182, "akka/remote/Remoting", "system", Type.getMethodDescriptor(objectType, new Type[0]), false);
                        super.visitMethodInsn(182, objectType.getInternalName(), "dispatcher", Type.getMethodDescriptor(determineAkkaDispatcherReturnType, new Type[0]), false);
                    }
                };
            }
        }, 8);
        return classWriter.toByteArray();
    }

    @Nullable
    private Type determineAkkaDispatcherReturnType() {
        URL url = (URL) this.resourceLookup.apply("akka/actor/ActorSystem.class");
        if (url == null) {
            return null;
        }
        try {
            InputStream openStream = url.openStream();
            Throwable th = null;
            try {
                try {
                    final AtomicReference atomicReference = new AtomicReference();
                    new ClassReader(openStream).accept(new ClassVisitor(327680) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.8
                        public MethodVisitor visitMethod(int i, String str, String str2, String str3, String[] strArr) {
                            if (str.equals("dispatcher") && Type.getArgumentTypes(str2).length == 0) {
                                Type returnType = Type.getReturnType(str2);
                                if (returnType.equals(SparkClassRewriter.EXECUTION_CONTEXT_TYPE) || returnType.equals(SparkClassRewriter.EXECUTION_CONTEXT_EXECUTOR_TYPE)) {
                                    atomicReference.set(returnType);
                                } else {
                                    SparkClassRewriter.LOG.warn("Unsupported return type of ActorSystem.dispatcher(): {}", returnType.getClassName());
                                }
                            }
                            return super.visitMethod(i, str, str2, str3, strArr);
                        }
                    }, 7);
                    Type type = (Type) atomicReference.get();
                    if (openStream != null) {
                        if (0 != 0) {
                            try {
                                openStream.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            openStream.close();
                        }
                    }
                    return type;
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.warn("Failed to determine ActorSystem dispatcher() return type.", e);
            return null;
        }
    }

    @Nullable
    private byte[] rewriteClient(InputStream inputStream) throws IOException {
        boolean z = false;
        Iterator it = ImmutableList.of("HADOOP_CONF_DIR", "YARN_CONF_DIR").iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String str = System.getenv((String) it.next());
            if (str != null) {
                File file = new File(str);
                if (file.isDirectory() && file.listFiles() == null) {
                    z = true;
                    break;
                }
            }
        }
        if (!z) {
            return null;
        }
        ClassReader classReader = new ClassReader(inputStream);
        ClassWriter classWriter = new ClassWriter(1);
        classReader.accept(new ClassVisitor(327680, classWriter) { // from class: co.cask.cdap.app.runtime.spark.classloader.SparkClassRewriter.9
            public MethodVisitor visitMethod(int i, String str2, String str3, String str4, String[] strArr) {
                MethodVisitor visitMethod = super.visitMethod(i, str2, str3, str4, strArr);
                if (!"createConfArchive".equals(str2)) {
                    return visitMethod;
                }
                boolean equals = Type.getReturnType(str3).equals(Type.getType(File.class));
                Type objectType = Type.getObjectType("scala/Option");
                if (!equals && !Type.getReturnType(str3).equals(objectType)) {
                    return visitMethod;
                }
                GeneratorAdapter generatorAdapter = new GeneratorAdapter(visitMethod, i, str2, str3);
                generatorAdapter.loadThis();
                generatorAdapter.getField(Type.getObjectType("org/apache/spark/deploy/yarn/Client"), "sparkConf", SparkClassRewriter.SPARK_CONF_TYPE);
                generatorAdapter.visitLdcInsn(SparkClassRewriter.SPARK_CONF_FILE);
                generatorAdapter.visitLdcInsn(SparkClassRewriter.LOCALIZED_CONF_DIR);
                generatorAdapter.visitLdcInsn(SparkClassRewriter.LOCALIZED_CONF_DIR_ZIP);
                Type type = Type.getType(String.class);
                generatorAdapter.invokeStatic(SparkClassRewriter.SPARK_RUNTIME_UTILS_TYPE, new Method("createConfArchive", Type.getType(File.class), new Type[]{SparkClassRewriter.SPARK_CONF_TYPE, type, type, type}));
                if (equals) {
                    generatorAdapter.returnValue();
                    generatorAdapter.endMethod();
                    return null;
                }
                generatorAdapter.invokeStatic(objectType, new Method("apply", objectType, new Type[]{Type.getType(Object.class)}));
                generatorAdapter.checkCast(objectType);
                generatorAdapter.returnValue();
                generatorAdapter.endMethod();
                return null;
            }
        }, 8);
        return classWriter.toByteArray();
    }

    private byte[] rewriteSparkHadoopUtil(String str, InputStream inputStream) throws IOException {
        return Classes.rewriteMethodToNoop(str, inputStream, ImmutableSet.of("obtainTokensForNamenodes", "obtainTokenForHiveMetastore", "obtainTokenForHBase"));
    }
}
