package co.cask.cdap.app.runtime.spark.distributed;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.app.guice.ClusterMode;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramClassLoaderProvider;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.spark.SparkPackageUtils;
import co.cask.cdap.app.runtime.spark.SparkProgramRuntimeProvider;
import co.cask.cdap.app.runtime.spark.SparkResourceFilters;
import co.cask.cdap.app.runtime.spark.SparkRuntimeContextConfig;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.lang.FilterClassLoader;
import co.cask.cdap.common.twill.HadoopClassExcluder;
import co.cask.cdap.internal.app.runtime.batch.distributed.MapReduceContainerHelper;
import co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRunner;
import co.cask.cdap.internal.app.runtime.distributed.ProgramLaunchConfig;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.runtime.spi.SparkCompat;
import co.cask.cdap.security.TokenSecureStoreRenewer;
import co.cask.cdap.security.impersonation.Impersonator;
import co.cask.cdap.security.impersonation.SecurityUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.ClassAcceptor;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/app/runtime/spark/distributed/DistributedSparkProgramRunner.class */
public final class DistributedSparkProgramRunner extends DistributedProgramRunner implements ProgramClassLoaderProvider {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedSparkProgramRunner.class);
    private final LocationFactory locationFactory;
    private final SparkCompat sparkCompat;

    @VisibleForTesting
    @Inject
    public DistributedSparkProgramRunner(SparkCompat sparkCompat, CConfiguration cConfiguration, YarnConfiguration yarnConfiguration, Impersonator impersonator, LocationFactory locationFactory, ClusterMode clusterMode, @Constants.AppFabric.ProgramRunner TwillRunner twillRunner) {
        super(cConfiguration, yarnConfiguration, impersonator, clusterMode, twillRunner);
        this.sparkCompat = sparkCompat;
        this.locationFactory = locationFactory;
    }

    public ProgramController createProgramController(TwillController twillController, ProgramId programId, RunId runId) {
        return new SparkTwillProgramController(programId, twillController, runId).startListen();
    }

    protected void validateOptions(Program program, ProgramOptions programOptions) {
        super.validateOptions(program, programOptions);
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification for %s", new Object[]{program.getId()});
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type for %s", new Object[]{program.getId()});
        Preconditions.checkArgument(type == ProgramType.SPARK, "Only SPARK process type is supported. Program type is %s for %s", new Object[]{type, program.getId()});
        Preconditions.checkNotNull((SparkSpecification) applicationSpecification.getSpark().get(program.getName()), "Missing SparkSpecification for %s", new Object[]{program.getId()});
    }

    protected void setupLaunchConfig(ProgramLaunchConfig programLaunchConfig, Program program, ProgramOptions programOptions, CConfiguration cConfiguration, Configuration configuration, File file) throws IOException {
        if (this.clusterMode == ClusterMode.ON_PREMISE) {
            configuration.set("hive.metastore.token.signature", "hive.metastore.service");
            if (SecurityUtil.isKerberosEnabled(cConfiguration)) {
                programLaunchConfig.addExtraSystemArgument(SparkRuntimeContextConfig.CREDENTIALS_UPDATE_INTERVAL_MS, Long.toString((long) ((TokenSecureStoreRenewer.calculateUpdateInterval(cConfiguration, configuration) + 5000) / 0.8d)));
            }
        }
        SparkSpecification sparkSpecification = (SparkSpecification) program.getApplicationSpecification().getSpark().get(program.getName());
        programLaunchConfig.addRunnable(sparkSpecification.getName(), new SparkTwillRunnable(sparkSpecification.getName()), 1, RuntimeArguments.extractScope("task", "client", programOptions.getUserArguments().asMap()), sparkSpecification.getClientResources(), 0);
        HashMap hashMap = new HashMap();
        hashMap.put("SPARK_COMPAT", this.sparkCompat.getCompat());
        if (this.clusterMode == ClusterMode.ON_PREMISE) {
            hashMap.putAll(SparkPackageUtils.getSparkClientEnv());
            HashMap hashMap2 = new HashMap();
            SparkPackageUtils.prepareSparkResources(this.sparkCompat, this.locationFactory, file, hashMap2, hashMap);
            MapReduceContainerHelper.localizeFramework(configuration, hashMap2);
            programLaunchConfig.addExtraResources(hashMap2).addExtraClasspath(MapReduceContainerHelper.addMapReduceClassPath(configuration, new ArrayList()));
        } else {
            cConfiguration.setBoolean("app.program.spark.yarn.client.rewrite.enabled", false);
            programLaunchConfig.addExtraClasspath(Collections.singletonList("$HADOOP_CLASSPATH"));
            hashMap.put(SparkPackageUtils.SPARK_YARN_MODE, "true");
        }
        programLaunchConfig.addExtraEnv(hashMap).addExtraDependencies(new Class[]{SparkProgramRuntimeProvider.class}).addExtraSystemArgument(SparkRuntimeContextConfig.DISTRIBUTED_MODE, Boolean.TRUE.toString()).setClassAcceptor(createBundlerClassAcceptor());
    }

    private ClassAcceptor createBundlerClassAcceptor() throws MalformedURLException {
        final HashSet hashSet = new HashSet();
        Iterator<File> it = SparkPackageUtils.getLocalSparkLibrary(this.sparkCompat).iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().toURI().toURL());
        }
        return new HadoopClassExcluder() { // from class: co.cask.cdap.app.runtime.spark.distributed.DistributedSparkProgramRunner.1
            public boolean accept(String str, URL url, URL url2) {
                return (hashSet.contains(url2) || !super.accept(str, url, url2) || str.startsWith("org.apache.spark.")) ? false : true;
            }
        };
    }

    public ClassLoader createProgramClassLoaderParent() {
        return new FilterClassLoader(getClass().getClassLoader(), SparkResourceFilters.SPARK_PROGRAM_CLASS_LOADER_FILTER);
    }
}
