package co.cask.cdap.app.runtime.spark;

import co.cask.cdap.app.runtime.spark.SparkRuntimeEnv;
import java.lang.Thread;
import java.util.Properties;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.annotation.Nullable;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.streaming.StreamingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Function0;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.collection.Iterable;
import scala.collection.JavaConversions$;
import scala.collection.mutable.ArrayBuffer;
import scala.collection.mutable.ArrayBuffer$;
import scala.collection.mutable.ListBuffer;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkRuntimeEnv.scala */
/* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkRuntimeEnv$.class */
public final class SparkRuntimeEnv$ {
    public static final SparkRuntimeEnv$ MODULE$ = null;
    private final Logger co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$LOG;
    private boolean stopped;
    private final Properties properties;
    private Option<SparkContext> sparkContext;
    private Option<StreamingContext> streamingContext;
    private final ListBuffer<Object> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$batchedWALs;
    private final ListBuffer<Object> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$rateControllers;
    private final ListBuffer<Thread> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$pyMonitorThreads;
    private final ConcurrentLinkedQueue<SparkListener> sparkListeners;

    static {
        new SparkRuntimeEnv$();
    }

    public Logger co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$LOG() {
        return this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$LOG;
    }

    private boolean stopped() {
        return this.stopped;
    }

    private void stopped_$eq(boolean z) {
        this.stopped = z;
    }

    private Properties properties() {
        return this.properties;
    }

    private Option<SparkContext> sparkContext() {
        return this.sparkContext;
    }

    private void sparkContext_$eq(Option<SparkContext> option) {
        this.sparkContext = option;
    }

    private Option<StreamingContext> streamingContext() {
        return this.streamingContext;
    }

    private void streamingContext_$eq(Option<StreamingContext> option) {
        this.streamingContext = option;
    }

    public ListBuffer<Object> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$batchedWALs() {
        return this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$batchedWALs;
    }

    public ListBuffer<Object> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$rateControllers() {
        return this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$rateControllers;
    }

    public ListBuffer<Thread> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$pyMonitorThreads() {
        return this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$pyMonitorThreads;
    }

    private ConcurrentLinkedQueue<SparkListener> sparkListeners() {
        return this.sparkListeners;
    }

    public synchronized boolean isStopped() {
        return stopped();
    }

    public String setProperty(String str, String str2) {
        return (String) properties().setProperty(str, str2);
    }

    public String getProperty(String str) {
        return properties().getProperty(str);
    }

    public void setupSparkConf(SparkConf sparkConf) {
        sparkConf.setAll(JavaConversions$.MODULE$.propertiesAsScalaMap(properties()));
    }

    public void addSparkListener(SparkListener sparkListener) {
        sparkListeners().add(sparkListener);
    }

    public synchronized void setContext(SparkContext sparkContext) {
        if (stopped()) {
            sparkContext.stop();
            throw new IllegalStateException("Spark program is already stopped");
        }
        if (sparkContext().isDefined()) {
            throw new IllegalStateException("SparkContext was already created");
        }
        sparkContext_$eq(new Some(sparkContext));
        sparkContext.addSparkListener(new DelegatingSparkListener(sparkListeners()));
    }

    public synchronized void setContext(StreamingContext streamingContext) {
        if (stopped()) {
            streamingContext.stop(false);
            throw new IllegalStateException("Spark program is already stopped");
        }
        streamingContext_$eq(new Some(streamingContext));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void addBatchedWriteAheadLog(Object obj) {
        synchronized (this) {
            if (stopped()) {
                co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopBatchedWAL(obj);
                throw new IllegalStateException("Spark program is already stopped");
            }
            co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$batchedWALs().$plus$eq(obj);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void addRateController(Object obj) {
        synchronized (this) {
            if (stopped()) {
                co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopRateController(obj);
                throw new IllegalStateException("Spark program is already stopped");
            }
            co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$rateControllers().$plus$eq(obj);
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void addPyMonitorThread(Thread thread) {
        synchronized (this) {
            thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeEnv$$anon$1
                @Override // java.lang.Thread.UncaughtExceptionHandler
                public void uncaughtException(Thread thread2, Throwable th) {
                    if (th instanceof InterruptedException) {
                        SparkRuntimeEnv$.MODULE$.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$LOG().trace("Thread {} interrupted", new Object[]{thread2});
                        BoxedUnit boxedUnit = BoxedUnit.UNIT;
                    } else {
                        SparkRuntimeEnv$.MODULE$.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$LOG().warn("Exception raised from {}", thread2, th);
                        BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
                    }
                }
            });
            if (stopped()) {
                thread.interrupt();
                throw new IllegalStateException("Spark program is already stopped");
            }
            co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$pyMonitorThreads().$plus$eq(thread);
        }
    }

    public synchronized SparkContext getContext() {
        return (SparkContext) sparkContext().getOrElse(new SparkRuntimeEnv$$anonfun$getContext$1());
    }

    public synchronized Option<StreamingContext> getStreamingContext() {
        return streamingContext();
    }

    public synchronized boolean setLocalProperty(String str, String str2) {
        return BoxesRunTime.unboxToBoolean(sparkContext().fold(new SparkRuntimeEnv$$anonfun$setLocalProperty$1(), new SparkRuntimeEnv$$anonfun$setLocalProperty$2(str, str2)));
    }

    @Nullable
    public synchronized String getLocalProperty(String str) {
        return (String) sparkContext().map(new SparkRuntimeEnv$$anonfun$getLocalProperty$1(str)).orNull(Predef$.MODULE$.conforms());
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v12 */
    /* JADX WARN: Type inference failed for: r0v17 */
    /* JADX WARN: Type inference failed for: r0v18, types: [java.lang.Throwable] */
    /* JADX WARN: Type inference failed for: r0v2 */
    /* JADX WARN: Type inference failed for: r0v23 */
    /* JADX WARN: Type inference failed for: r0v3, types: [java.lang.Throwable] */
    public Option<SparkContext> stop() {
        Option<SparkContext> option = None$.MODULE$;
        Option<StreamingContext> option2 = None$.MODULE$;
        ?? r0 = this;
        synchronized (r0) {
            if (stopped()) {
                BoxedUnit boxedUnit = BoxedUnit.UNIT;
            } else {
                stopped_$eq(true);
                option = sparkContext();
                option2 = streamingContext();
                BoxedUnit boxedUnit2 = BoxedUnit.UNIT;
            }
            r0 = r0;
            try {
                option2.foreach(new SparkRuntimeEnv$$anonfun$stop$1());
                option.foreach(new SparkRuntimeEnv$$anonfun$stop$2());
                ?? r02 = this;
                synchronized (r02) {
                    sparkListeners().clear();
                    Option<SparkContext> sparkContext = sparkContext();
                    r02 = r02;
                    return sparkContext;
                }
            } catch (Throwable th) {
                option.foreach(new SparkRuntimeEnv$$anonfun$stop$2());
                throw th;
            }
        }
    }

    public Function0<BoxedUnit> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup(SparkContext sparkContext, Iterable<Object> iterable, Iterable<Object> iterable2, Iterable<Thread> iterable3) {
        ArrayBuffer empty = ArrayBuffer$.MODULE$.empty();
        empty.$plus$eq(co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$ObjectReflectionFunctions(sparkContext).callMethod("env").orElse(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$5(sparkContext)).flatMap(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$6(sparkContext)));
        empty.$plus$eq(co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$ObjectReflectionFunctions(sparkContext).callMethod("ui").orElse(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$7(sparkContext)).flatMap(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$8(sparkContext)));
        empty.$plus$eq(new Some(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$1(iterable)));
        empty.$plus$eq(new Some(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$2(iterable2)));
        empty.$plus$eq(new Some(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$3(iterable3)));
        return new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createCleanup$4(empty);
    }

    public Option<Function0<BoxedUnit>> co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createServerCloser(Option<Object> option, SparkContext sparkContext) {
        return option.flatMap(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$createServerCloser$1(sparkContext));
    }

    public void co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopBatchedWAL(Object obj) {
        co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$ObjectReflectionFunctions(obj).getField("walWriteQueue").flatMap(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopBatchedWAL$1()).foreach(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopBatchedWAL$2(obj));
    }

    public void co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopRateController(Object obj) {
        co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$ObjectReflectionFunctions(obj).getField("executionContext").flatMap(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopRateController$1()).foreach(new SparkRuntimeEnv$$anonfun$co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$stopRateController$2());
    }

    public SparkRuntimeEnv.ObjectReflectionFunctions co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$ObjectReflectionFunctions(Object obj) {
        return new SparkRuntimeEnv.ObjectReflectionFunctions(obj);
    }

    private SparkRuntimeEnv$() {
        MODULE$ = this;
        this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$LOG = LoggerFactory.getLogger(getClass());
        this.stopped = false;
        this.properties = new Properties();
        this.sparkContext = None$.MODULE$;
        this.streamingContext = None$.MODULE$;
        this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$batchedWALs = new ListBuffer<>();
        this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$rateControllers = new ListBuffer<>();
        this.co$cask$cdap$app$runtime$spark$SparkRuntimeEnv$$pyMonitorThreads = new ListBuffer<>();
        this.sparkListeners = new ConcurrentLinkedQueue<>();
    }
}
