package co.cask.cdap.app.runtime.spark.submit;

import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.app.runtime.spark.SparkClassLoader;
import co.cask.cdap.app.runtime.spark.SparkMainWrapper;
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
import co.cask.cdap.app.runtime.spark.SparkRuntimeUtils;
import co.cask.cdap.internal.app.runtime.distributed.LocalizeResource;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.Uninterruptibles;
import java.net.URI;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.spark.deploy.SparkSubmit;
import org.apache.twill.common.Cancellable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/submit/AbstractSparkSubmitter.class */
public abstract class AbstractSparkSubmitter implements SparkSubmitter {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractSparkSubmitter.class);
    private static final Predicate<LocalizeResource> ARCHIVE_FILTER = new Predicate<LocalizeResource>() { // from class: co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.1
        public boolean apply(LocalizeResource localizeResource) {
            return localizeResource.isArchive();
        }
    };
    private static final Function<LocalizeResource, String> RESOURCE_TO_PATH = new Function<LocalizeResource, String>() { // from class: co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.2
        public String apply(LocalizeResource localizeResource) {
            return localizeResource.getURI().toString();
        }
    };

    /* loaded from: input_file:co/cask/cdap/app/runtime/spark/submit/AbstractSparkSubmitter$SparkJobFuture.class */
    private static abstract class SparkJobFuture<V> extends AbstractFuture<V> {
        private static final Logger LOG = LoggerFactory.getLogger(SparkJobFuture.class);
        private final AtomicBoolean done = new AtomicBoolean();
        private final SparkRuntimeContext context;

        protected SparkJobFuture(SparkRuntimeContext sparkRuntimeContext) {
            this.context = sparkRuntimeContext;
        }

        protected boolean set(V v) {
            if (this.done.compareAndSet(false, true)) {
                return super.set(v);
            }
            return false;
        }

        protected boolean setException(Throwable th) {
            if (this.done.compareAndSet(false, true)) {
                return super.setException(th);
            }
            return false;
        }

        public boolean cancel(boolean z) {
            if (!this.done.compareAndSet(false, true)) {
                return false;
            }
            try {
                cancelTask();
                return super.cancel(z);
            } catch (Throwable th) {
                LOG.warn("Failed to cancel Spark execution for {}.", this.context, th);
                this.done.set(false);
                return false;
            }
        }

        protected final void interruptTask() {
        }

        protected void cancelTask() {
        }
    }

    @Override // co.cask.cdap.app.runtime.spark.submit.SparkSubmitter
    public final <V> ListenableFuture<V> submit(final SparkRuntimeContext sparkRuntimeContext, Map<String, String> map, List<LocalizeResource> list, URI uri, final V v) {
        final SparkSpecification sparkSpecification = sparkRuntimeContext.getSparkSpecification();
        final List<String> createSubmitArguments = createSubmitArguments(sparkRuntimeContext, map, list, uri);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() { // from class: co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.3
            @Override // java.util.concurrent.ThreadFactory
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable, "spark-submitter-" + sparkSpecification.getName() + "-" + sparkRuntimeContext.getRunId());
            }
        });
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final SparkJobFuture<V> sparkJobFuture = new SparkJobFuture<V>(sparkRuntimeContext) { // from class: co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.4
            @Override // co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.SparkJobFuture
            protected void cancelTask() {
                AbstractSparkSubmitter.this.triggerShutdown();
                Uninterruptibles.awaitUninterruptibly(countDownLatch);
            }
        };
        newSingleThreadExecutor.submit(new Runnable() { // from class: co.cask.cdap.app.runtime.spark.submit.AbstractSparkSubmitter.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    try {
                        AbstractSparkSubmitter.this.submit(sparkRuntimeContext, (String[]) Iterables.toArray(Iterables.concat(createSubmitArguments, AbstractSparkSubmitter.this.beforeSubmit()), String.class));
                        AbstractSparkSubmitter.this.onCompleted(true);
                        sparkJobFuture.set(v);
                        countDownLatch.countDown();
                    } catch (Throwable th) {
                        AbstractSparkSubmitter.this.onCompleted(false);
                        sparkJobFuture.setException(th);
                        countDownLatch.countDown();
                    }
                } catch (Throwable th2) {
                    countDownLatch.countDown();
                    throw th2;
                }
            }
        });
        newSingleThreadExecutor.shutdown();
        return sparkJobFuture;
    }

    protected abstract void addMaster(Map<String, String> map, ImmutableList.Builder<String> builder);

    protected abstract void triggerShutdown();

    protected List<String> beforeSubmit() {
        return Collections.emptyList();
    }

    protected void onCompleted(boolean z) {
    }

    protected Map<String, String> getSubmitConf() {
        return Collections.emptyMap();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submit(SparkRuntimeContext sparkRuntimeContext, String[] strArr) {
        Cancellable contextClassLoader = SparkRuntimeUtils.setContextClassLoader(new SparkClassLoader(sparkRuntimeContext));
        try {
            LOG.debug("Calling SparkSubmit for {} {}: {}", new Object[]{sparkRuntimeContext.getProgram().getId(), sparkRuntimeContext.getRunId(), Arrays.toString(strArr)});
            System.setProperty("SPARK_SUBMIT", "true");
            SparkSubmit.main(strArr);
            LOG.debug("SparkSubmit returned for {} {}", sparkRuntimeContext.getProgram().getId(), sparkRuntimeContext.getRunId());
            contextClassLoader.cancel();
        } catch (Throwable th) {
            contextClassLoader.cancel();
            throw th;
        }
    }

    private List<String> createSubmitArguments(SparkRuntimeContext sparkRuntimeContext, Map<String, String> map, List<LocalizeResource> list, URI uri) {
        SparkSpecification sparkSpecification = sparkRuntimeContext.getSparkSpecification();
        ImmutableList.Builder<String> builder = ImmutableList.builder();
        addMaster(map, builder);
        builder.add("--conf").add("spark.app.name=" + sparkSpecification.getName());
        for (Map.Entry<String, String> entry : map.entrySet()) {
            builder.add("--conf").add(entry.getKey() + "=" + entry.getValue());
        }
        for (Map.Entry<String, String> entry2 : getSubmitConf().entrySet()) {
            builder.add("--conf").add(entry2.getKey() + "=" + entry2.getValue());
        }
        String join = Joiner.on(',').join(Iterables.transform(Iterables.filter(list, ARCHIVE_FILTER), RESOURCE_TO_PATH));
        String join2 = Joiner.on(',').join(Iterables.transform(Iterables.filter(list, Predicates.not(ARCHIVE_FILTER)), RESOURCE_TO_PATH));
        if (!join.isEmpty()) {
            builder.add("--archives").add(join);
        }
        if (!join2.isEmpty()) {
            builder.add("--files").add(join2);
        }
        boolean endsWith = uri.getPath().endsWith(".py");
        if (endsWith) {
            String str = map.get("spark.submit.pyFiles");
            if (str != null) {
                builder.add("--py-files").add(str);
            }
        } else {
            builder.add("--class").add(SparkMainWrapper.class.getName());
        }
        if ("file".equals(uri.getScheme())) {
            builder.add(uri.getPath());
        } else {
            builder.add(uri.toString());
        }
        if (!endsWith) {
            builder.add("--cdap.spark.program=" + sparkRuntimeContext.getProgramRunId().toString());
            builder.add("--cdap.user.main.class=" + sparkSpecification.getMainClassName());
        }
        return builder.build();
    }
}
