package co.cask.cdap.app.runtime.spark;

import co.cask.cdap.app.runtime.spark.distributed.SparkDriverService;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.jar.BundleJarUtil;
import com.google.common.io.OutputSupplier;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.Service;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.gson.Gson;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
import org.apache.spark.SparkConf;
import org.apache.twill.common.Threads;
import org.apache.twill.internal.ServiceListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.parallel.ThreadPoolTaskSupport;
import scala.collection.parallel.mutable.ParArray;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/SparkRuntimeUtils.class */
public final class SparkRuntimeUtils {
    public static final String CDAP_SPARK_EXECUTION_SERVICE_URI = "CDAP_SPARK_EXECUTION_SERVICE_URI";
    private static final String LOCALIZED_RESOURCES = "spark.cdap.localized.resources";
    private static final Logger LOG = LoggerFactory.getLogger(SparkRuntimeUtils.class);
    private static final Gson GSON = new Gson();

    public static File createConfArchive(SparkConf sparkConf, final String str, String str2, final File file) {
        final Properties properties = new Properties();
        for (Tuple2 tuple2 : sparkConf.getAll()) {
            properties.put(tuple2._1(), tuple2._2());
        }
        try {
            File file2 = new File(str2);
            BundleJarUtil.createArchive(file2, new OutputSupplier<ZipOutputStream>() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeUtils.1
                /* renamed from: getOutput, reason: merged with bridge method [inline-methods] */
                public ZipOutputStream m52getOutput() throws IOException {
                    ZipOutputStream zipOutputStream = new ZipOutputStream(new FileOutputStream(file));
                    zipOutputStream.putNextEntry(new ZipEntry(str));
                    properties.store(zipOutputStream, "Spark configuration.");
                    zipOutputStream.closeEntry();
                    return zipOutputStream;
                }
            });
            LOG.debug("Spark config archive created at {} from {}", file, file2);
            return file;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> ParArray<T> setTaskSupport(ParArray<T> parArray) {
        ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 1L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadFactoryBuilder().setNameFormat("task-support-%d").build());
        threadPoolExecutor.allowCoreThreadTimeOut(true);
        parArray.tasksupport_$eq(new ThreadPoolTaskSupport(threadPoolExecutor));
        return parArray;
    }

    public static void setLocalizedResources(Set<String> set, Map<String, String> map) {
        map.put(LOCALIZED_RESOURCES, GSON.toJson(set));
    }

    /* JADX WARN: Type inference failed for: r2v1, types: [co.cask.cdap.app.runtime.spark.SparkRuntimeUtils$2] */
    public static Map<String, File> getLocalizedResources(File file, SparkConf sparkConf) {
        String str = sparkConf.get(LOCALIZED_RESOURCES, (String) null);
        if (str == null) {
            return Collections.emptyMap();
        }
        HashMap hashMap = new HashMap();
        for (String str2 : (Set) GSON.fromJson(str, new TypeToken<Set<String>>() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeUtils.2
        }.getType())) {
            hashMap.put(str2, new File(file, str2));
        }
        return hashMap;
    }

    public static SparkProgramCompletion initSparkMain() {
        SparkClassLoader create;
        final Thread currentThread = Thread.currentThread();
        try {
            create = SparkClassLoader.findFromContext();
        } catch (IllegalStateException e) {
            create = SparkClassLoader.create();
        }
        final ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(create.getRuntimeContext().getProgramInvocationClassLoader());
        final DefaultSparkExecutionContext sparkExecutionContext = create.getSparkExecutionContext(true);
        final SparkRuntimeContext runtimeContext = create.getRuntimeContext();
        final Service createSparkDriverService = createSparkDriverService(runtimeContext);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        createSparkDriverService.addListener(new ServiceListenerAdapter() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeUtils.3
            public void terminated(Service.State state) {
                handleStopped();
            }

            public void failed(Service.State state, Throwable th) {
                handleStopped();
            }

            private void handleStopped() {
                if (Thread.currentThread() != currentThread) {
                    currentThread.interrupt();
                    if (SparkRuntimeEnv.getStreamingContext().isDefined()) {
                        Uninterruptibles.awaitUninterruptibly(countDownLatch);
                    }
                }
                if (sparkExecutionContext instanceof AutoCloseable) {
                    try {
                        sparkExecutionContext.close();
                    } catch (Exception e2) {
                        SparkRuntimeUtils.LOG.warn("Exception raised when calling {}.close() for program run {}.", new Object[]{sparkExecutionContext.getClass().getName(), runtimeContext.getProgramRunId(), e2});
                    }
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
        createSparkDriverService.startAndWait();
        return new SparkProgramCompletion() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeUtils.4
            @Override // co.cask.cdap.app.runtime.spark.SparkProgramCompletion
            public void completed() {
                handleCompleted(false);
            }

            @Override // co.cask.cdap.app.runtime.spark.SparkProgramCompletion
            public void completedWithException(Throwable th) {
                handleCompleted(true);
            }

            private void handleCompleted(boolean z) {
                if (Thread.currentThread() == currentThread) {
                    countDownLatch.countDown();
                    currentThread.setContextClassLoader(contextClassLoader);
                }
                if (z && (createSparkDriverService instanceof SparkDriverService)) {
                    createSparkDriverService.stopWithoutComplete();
                } else {
                    createSparkDriverService.stopAndWait();
                }
            }
        };
    }

    private static Service createSparkDriverService(SparkRuntimeContext sparkRuntimeContext) {
        String str = System.getenv(CDAP_SPARK_EXECUTION_SERVICE_URI);
        return str != null ? new SparkDriverService(URI.create(str), sparkRuntimeContext) : new AbstractService() { // from class: co.cask.cdap.app.runtime.spark.SparkRuntimeUtils.5
            protected void doStart() {
                notifyStarted();
            }

            protected void doStop() {
                notifyStopped();
            }
        };
    }
}
