package co.cask.cdap.spark.app;

import co.cask.cdap.api.flow.flowlet.StreamEvent;
import co.cask.cdap.api.spark.AbstractExtendedSpark;
import co.cask.cdap.api.spark.SparkExecutionContext;
import co.cask.cdap.api.spark.SparkMain;
import co.cask.cdap.api.spark.SparkMain$Transaction$;
import co.cask.cdap.api.spark.dynamic.SparkCompiler;
import co.cask.cdap.api.spark.dynamic.SparkInterpreter;
import com.google.common.io.BaseEncoding;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import org.apache.spark.SparkContext;
import org.apache.spark.rdd.RDD;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.JavaConversions$;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.BoxedUnit;

/* compiled from: ScalaDynamicSpark.scala */
@ScalaSignature(bytes = "\u0006\u0001m2A!\u0001\u0002\u0001\u001b\t\t2kY1mC\u0012Kh.Y7jGN\u0003\u0018M]6\u000b\u0005\r!\u0011aA1qa*\u0011QAB\u0001\u0006gB\f'o\u001b\u0006\u0003\u000f!\tAa\u00193ba*\u0011\u0011BC\u0001\u0005G\u0006\u001c8NC\u0001\f\u0003\t\u0019wn\u0001\u0001\u0014\u0007\u0001qQ\u0003\u0005\u0002\u0010'5\t\u0001C\u0003\u0002\u0006#)\u0011!CB\u0001\u0004CBL\u0017B\u0001\u000b\u0011\u0005U\t%m\u001d;sC\u000e$X\t\u001f;f]\u0012,Gm\u00159be.\u0004\"a\u0004\f\n\u0005]\u0001\"!C*qCJ\\W*Y5o\u0011\u0015I\u0002\u0001\"\u0001\u001b\u0003\u0019a\u0014N\\5u}Q\t1\u0004\u0005\u0002\u001d\u00015\t!\u0001C\u0004\u001f\u0001\t\u0007I\u0011A\u0010\u0002\u0017\rd\u0017m]:T_V\u00148-Z\u000b\u0002AA\u0011\u0011EJ\u0007\u0002E)\u00111\u0005J\u0001\u0005Y\u0006twMC\u0001&\u0003\u0011Q\u0017M^1\n\u0005\u001d\u0012#AB*ue&tw\r\u0003\u0004*\u0001\u0001\u0006I\u0001I\u0001\rG2\f7o]*pkJ\u001cW\r\t\u0005\u0006W\u0001!\t\u0006L\u0001\nG>tg-[4ve\u0016$\u0012!\f\t\u0003]Ej\u0011a\f\u0006\u0002a\u0005)1oY1mC&\u0011!g\f\u0002\u0005+:LG\u000fC\u00035\u0001\u0011\u0005S'A\u0002sk:$\"!\f\u001c\t\u000b]\u001a\u00049\u0001\u001d\u0002\u0007M,7\r\u0005\u0002\u0010s%\u0011!\b\u0005\u0002\u0016'B\f'o[#yK\u000e,H/[8o\u0007>tG/\u001a=u\u0001")
/* loaded from: input_file:co/cask/cdap/spark/app/ScalaDynamicSpark.class */
public class ScalaDynamicSpark extends AbstractExtendedSpark implements SparkMain {
    private final String classSource;
    private final Function1<StreamEvent, Tuple2<Object, String>> timestampStringStreamDecoder;
    private final Function1<StreamEvent, String> stringStreamDecoder;
    private volatile SparkMain$Transaction$ Transaction$module;

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v0 */
    /* JADX WARN: Type inference failed for: r0v1, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v5 */
    private SparkMain$Transaction$ Transaction$lzycompute() {
        ?? r0 = this;
        synchronized (r0) {
            if (this.Transaction$module == null) {
                this.Transaction$module = new SparkMain$Transaction$(this);
            }
            BoxedUnit boxedUnit = BoxedUnit.UNIT;
            r0 = r0;
            return this.Transaction$module;
        }
    }

    public SparkMain$Transaction$ Transaction() {
        return this.Transaction$module == null ? Transaction$lzycompute() : this.Transaction$module;
    }

    public Function1<StreamEvent, Tuple2<Object, String>> timestampStringStreamDecoder() {
        return this.timestampStringStreamDecoder;
    }

    public Function1<StreamEvent, String> stringStreamDecoder() {
        return this.stringStreamDecoder;
    }

    public void co$cask$cdap$api$spark$SparkMain$_setter_$timestampStringStreamDecoder_$eq(Function1 function1) {
        this.timestampStringStreamDecoder = function1;
    }

    public void co$cask$cdap$api$spark$SparkMain$_setter_$stringStreamDecoder_$eq(Function1 function1) {
        this.stringStreamDecoder = function1;
    }

    public <K, V> SparkMain.SparkProgramRDDFunctions<K, V> SparkProgramRDDFunctions(RDD<Tuple2<K, V>> rdd, ClassTag<K> classTag, ClassTag<V> classTag2) {
        return SparkMain.class.SparkProgramRDDFunctions(this, rdd, classTag, classTag2);
    }

    public SparkMain.SparkProgramContextFunctions SparkProgramContextFunctions(SparkContext sparkContext) {
        return SparkMain.class.SparkProgramContextFunctions(this, sparkContext);
    }

    public String classSource() {
        return this.classSource;
    }

    public void configure() {
        setMainClass(ScalaDynamicSpark.class);
        SparkCompiler createSparkCompiler = getConfigurer().createSparkCompiler();
        try {
            createSparkCompiler.compile(classSource());
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            try {
                createSparkCompiler.saveAsJar(byteArrayOutputStream);
                byteArrayOutputStream.close();
                setProperties(JavaConversions$.MODULE$.mapAsJavaMap(Predef$.MODULE$.Map().apply(Predef$.MODULE$.wrapRefArray(new Tuple2[]{new Tuple2("compiled.jar", BaseEncoding.base64().encode(byteArrayOutputStream.toByteArray()))}))));
            } catch (Throwable th) {
                byteArrayOutputStream.close();
                throw th;
            }
        } finally {
            createSparkCompiler.close();
        }
    }

    public void run(SparkExecutionContext sparkExecutionContext) {
        SparkContext sparkContext = new SparkContext();
        File file = new File((String) sparkExecutionContext.getRuntimeArguments().get("tmpdir"), "compiled.jar");
        Files.write(file.toPath(), BaseEncoding.base64().decode(sparkExecutionContext.getSpecification().getProperty("compiled.jar")), new OpenOption[0]);
        SparkInterpreter createInterpreter = sparkExecutionContext.createInterpreter();
        try {
            createInterpreter.addDependencies(Predef$.MODULE$.wrapRefArray(new File[]{file}));
            createInterpreter.addImports(Predef$.MODULE$.wrapRefArray(new String[]{"test.dynamic.Compute"}));
            createInterpreter.bind("sc", sparkContext, package$.MODULE$.universe().TypeTag().apply(package$.MODULE$.universe().runtimeMirror(ScalaDynamicSpark.class.getClassLoader()), new TypeCreator(this) { // from class: co.cask.cdap.spark.app.ScalaDynamicSpark$$typecreator1$1
                public <U extends Universe> Types.TypeApi apply(Mirror<U> mirror) {
                    mirror.universe();
                    return mirror.staticClass("org.apache.spark.SparkContext").asType().toTypeConstructor();
                }
            }), ClassTag$.MODULE$.apply(SparkContext.class));
            createInterpreter.bind("sec", sparkExecutionContext.getClass().getName(), sparkExecutionContext, Predef$.MODULE$.wrapRefArray(new String[]{"implicit"}));
            createInterpreter.interpret("Compute.run(sc)");
        } finally {
            createInterpreter.close();
        }
    }

    public ScalaDynamicSpark() {
        SparkMain.class.$init$(this);
        this.classSource = "\n      package test.dynamic\n\n      import co.cask.cdap.api.common._\n      import co.cask.cdap.api.spark._\n      import org.apache.spark._\n\n      object Compute {\n       def run(sc: SparkContext)(implicit sec: SparkExecutionContext) {\n         // Creates a dummy SparkMain instance for importing implicits\n         val sparkMain = new SparkMain() { override def run(implicit sec: SparkExecutionContext): Unit = { } }\n         import sparkMain._\n\n         val args = sec.getRuntimeArguments()\n         sc.fromStream[String](args.get(\"input\"))\n           .flatMap(_.split(\"\\\\s+\"))\n           .map((_, 1))\n           .reduceByKey(_ + _)\n           .map(t => (Bytes.toBytes(t._1), Bytes.toBytes(t._2)))\n           .saveAsDataset(args.get(\"output\")\n         )\n       }\n      }\n    ";
    }
}
