package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.WeakReferenceDelegatorClassLoader;
import co.cask.cdap.internal.app.runtime.distributed.LocalizeResource;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import org.apache.spark.deploy.SparkSubmit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/AbstractSparkSubmitter.class */
public abstract class AbstractSparkSubmitter implements SparkSubmitter {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkSubmitter.class);
    private static final Predicate<LocalizeResource> ARCHIVE_FILTER = new Predicate<LocalizeResource>() { // from class: co.cask.cdap.internal.app.runtime.spark.AbstractSparkSubmitter.1
        public boolean apply(LocalizeResource localizeResource) {
            return localizeResource.isArchive();
        }
    };
    private static final Function<LocalizeResource, String> RESOURCE_TO_PATH = new Function<LocalizeResource, String>() { // from class: co.cask.cdap.internal.app.runtime.spark.AbstractSparkSubmitter.2
        public String apply(LocalizeResource localizeResource) {
            return localizeResource.getURI().toString();
        }
    };

    @Override // co.cask.cdap.internal.app.runtime.spark.SparkSubmitter
    public final <V> ExecutionFuture<V> submit(final ExecutionSparkContext executionSparkContext, Map<String, String> map, List<LocalizeResource> list, File file, final V v) {
        final SparkSpecification specification = executionSparkContext.getSpecification();
        final List<String> createSubmitArguments = createSubmitArguments(specification.getMainClassName(), map, list, file);
        final ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: co.cask.cdap.internal.app.runtime.spark.AbstractSparkSubmitter.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "spark-submitter-" + specification.getName() + "-" + executionSparkContext.getRunId());
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final SettableExecutionFuture<V> settableExecutionFuture = new SettableExecutionFuture<V>(executionSparkContext) { // from class: co.cask.cdap.internal.app.runtime.spark.AbstractSparkSubmitter.4
            @Override // co.cask.cdap.internal.app.runtime.spark.SettableExecutionFuture
            protected void cancelTask() {
                AbstractSparkSubmitter.this.triggerShutdown(executionSparkContext);
                newSingleThreadExecutor.shutdownNow();
                Uninterruptibles.awaitUninterruptibly(countDownLatch);
            }
        };
        newSingleThreadExecutor.submit(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.spark.AbstractSparkSubmitter.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        AbstractSparkSubmitter.this.submit(executionSparkContext, (String[]) createSubmitArguments.toArray(new String[createSubmitArguments.size()]));
                        settableExecutionFuture.set(v);
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        settableExecutionFuture.setException(th);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th2) {
                    countDownLatch.countDown();
                    throw th2;
                }
            }
        });
        newSingleThreadExecutor.shutdown();
        return settableExecutionFuture;
    }

    protected abstract String getMaster();

    protected abstract void triggerShutdown(ExecutionSparkContext executionSparkContext);

    protected void submit(ExecutionSparkContext executionSparkContext, String[] strArr) {
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(new WeakReferenceDelegatorClassLoader(new SparkClassLoader(executionSparkContext)));
        try {
            LOG.debug("Calling SparkSubmit for {}: {}", executionSparkContext, Arrays.toString(strArr));
            SparkSubmit.main(strArr);
            LOG.debug("SparkSubmit returned for {}", executionSparkContext);
            ClassLoaders.setContextClassLoader(contextClassLoader);
        } catch (Throwable th) {
            ClassLoaders.setContextClassLoader(contextClassLoader);
            throw th;
        }
    }

    private List<String> createSubmitArguments(String str, Map<String, String> map, List<LocalizeResource> list, File file) {
        ImmutableList.Builder builder = ImmutableList.builder();
        builder.add("--master").add(getMaster());
        builder.add("--class").add(SparkProgramWrapper.class.getName());
        for (Map.Entry<String, String> entry : map.entrySet()) {
            builder.add("--conf").add(entry.getKey() + "=" + entry.getValue());
        }
        String join = Joiner.on(',').join(Iterables.transform(Iterables.filter(list, ARCHIVE_FILTER), RESOURCE_TO_PATH));
        String join2 = Joiner.on(',').join(Iterables.transform(Iterables.filter(list, Predicates.not(ARCHIVE_FILTER)), RESOURCE_TO_PATH));
        if (!join.isEmpty()) {
            builder.add("--archives").add(join);
        }
        if (!join2.isEmpty()) {
            builder.add("--files").add(join2);
        }
        return builder.add(file.getAbsolutePath()).add(str).build();
    }
}
