package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.spark.Spark;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.internal.app.runtime.spark.metrics.SparkMetricsSink;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.DefaultTransactionExecutor;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.concurrent.Executor;
import java.util.jar.JarEntry;
import java.util.jar.JarOutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.deploy.SparkSubmit;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkRuntimeService.class */
final class SparkRuntimeService extends AbstractExecutionThreadService {
    static final String SPARK_HCONF_FILENAME = "spark_hconf.xml";
    private static final Logger LOG = LoggerFactory.getLogger(SparkRuntimeService.class);
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final Spark spark;
    private final SparkSpecification sparkSpecification;
    private final Location programJarLocation;
    private final BasicSparkContext context;
    private final LocationFactory locationFactory;
    private final TransactionSystemClient txClient;
    private Transaction transaction;
    private Runnable cleanupTask;
    private String[] sparkSubmitArgs;
    private volatile boolean stopRequested;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkRuntimeService(CConfiguration cConfiguration, Configuration configuration, Spark spark, SparkSpecification sparkSpecification, BasicSparkContext basicSparkContext, Location location, LocationFactory locationFactory, TransactionSystemClient transactionSystemClient) {
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.spark = spark;
        this.sparkSpecification = sparkSpecification;
        this.programJarLocation = location;
        this.context = basicSparkContext;
        this.locationFactory = locationFactory;
        this.txClient = transactionSystemClient;
    }

    protected String getServiceName() {
        return "Spark - " + this.sparkSpecification.getName();
    }

    protected void startUp() throws Exception {
        Configuration configuration = new Configuration(this.hConf);
        beforeSubmit();
        try {
            Location copyProgramJar = copyProgramJar(this.programJarLocation, this.context);
            try {
                Transaction startLong = this.txClient.startLong();
                try {
                    SparkContextConfig.set(configuration, this.context, this.cConf, startLong, copyProgramJar);
                    Location buildDependencyJar = buildDependencyJar(this.context, SparkContextConfig.getHConf());
                    try {
                        File createTempFile = File.createTempFile(SparkMetricsSink.SPARK_METRICS_PROPERTIES_FILENAME, ".tmp", new File(new File(this.cConf.get("local.data.dir")), this.cConf.get("app.temp.dir")).getAbsoluteFile());
                        try {
                            SparkMetricsSink.generateSparkMetricsConfig(createTempFile);
                            this.context.setMetricsPropertyFile(createTempFile);
                            this.sparkSubmitArgs = prepareSparkSubmitArgs(this.sparkSpecification, configuration, copyProgramJar, buildDependencyJar);
                            LOG.info("Submitting Spark program: {} with arguments {}", this.context, Arrays.toString(this.sparkSubmitArgs));
                            this.transaction = startLong;
                            this.cleanupTask = createCleanupTask(ImmutableList.of(createTempFile), ImmutableList.of(buildDependencyJar, copyProgramJar));
                        } catch (Throwable th) {
                            createTempFile.delete();
                            throw th;
                        }
                    } catch (Throwable th2) {
                        LOG.error("Exception while creating metrics properties file for Spark", th2);
                        Locations.deleteQuietly(buildDependencyJar);
                        throw th2;
                    }
                } catch (Throwable th3) {
                    Transactions.invalidateQuietly(this.txClient, startLong);
                    throw th3;
                }
            } catch (Throwable th4) {
                Locations.deleteQuietly(copyProgramJar);
                throw th4;
            }
        } catch (Throwable th5) {
            LOG.error("Exception while preparing for submitting Spark Job: {}", this.context, th5);
            throw Throwables.propagate(th5);
        }
    }

    protected void run() throws Exception {
        try {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
            try {
                try {
                    SparkProgramWrapper.setBasicSparkContext(this.context);
                    SparkProgramWrapper.setSparkProgramRunning(true);
                    SparkSubmit.main(this.sparkSubmitArgs);
                    SparkProgramWrapper.setSparkProgramRunning(false);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                } catch (Throwable th) {
                    SparkProgramWrapper.setSparkProgramRunning(false);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    throw th;
                }
            } catch (Exception e) {
                LOG.error("Failed to submit Spark program {}", this.context, e);
                SparkProgramWrapper.setSparkProgramRunning(false);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            }
            if (this.stopRequested) {
                return;
            }
            Preconditions.checkState(!SparkProgramWrapper.isSparkProgramRunning() && SparkProgramWrapper.isSparkProgramSuccessful(), "Spark program execution failed.");
        } catch (Exception e2) {
            LOG.warn("Failed to set the classloader for submitting spark program");
            throw Throwables.propagate(e2);
        }
    }

    protected void shutDown() throws Exception {
        boolean isSparkProgramSuccessful = SparkProgramWrapper.isSparkProgramSuccessful();
        try {
            if (isSparkProgramSuccessful) {
                LOG.info("Committing Spark Program transaction: {}", this.context);
                if (!this.txClient.commit(this.transaction)) {
                    LOG.warn("Spark Job transaction failed to commit");
                    throw new TransactionFailureException("Failed to commit transaction for Spark " + this.context.toString());
                }
            } else {
                this.txClient.invalidate(this.transaction.getWritePointer());
            }
            try {
                onFinish(isSparkProgramSuccessful);
                this.context.close();
                this.cleanupTask.run();
            } finally {
            }
        } catch (Throwable th) {
            try {
                onFinish(isSparkProgramSuccessful);
                this.context.close();
                this.cleanupTask.run();
                throw th;
            } finally {
            }
        }
    }

    protected void triggerShutdown() {
        try {
            this.stopRequested = true;
            if (SparkProgramWrapper.isSparkProgramRunning()) {
                SparkProgramWrapper.stopSparkProgram();
            }
        } catch (Exception e) {
            LOG.error("Failed to stop Spark job {}", this.sparkSpecification.getName(), e);
            throw Throwables.propagate(e);
        }
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.1
            @Override // java.util.concurrent.Executor
            public void execute(final Runnable runnable) {
                Thread thread = new Thread(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        LoggingContextAccessor.setLoggingContext(SparkRuntimeService.this.context.getLoggingContext());
                        runnable.run();
                    }
                });
                thread.setDaemon(true);
                thread.setName(SparkRuntimeService.this.getServiceName());
                thread.start();
            }
        };
    }

    private void beforeSubmit() throws TransactionFailureException, InterruptedException {
        createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.2
            public void apply() throws Exception {
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                Thread.currentThread().setContextClassLoader(SparkRuntimeService.this.spark.getClass().getClassLoader());
                try {
                    SparkRuntimeService.this.spark.beforeSubmit(SparkRuntimeService.this.context);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    throw th;
                }
            }
        });
    }

    private void onFinish(final boolean z) throws TransactionFailureException, InterruptedException {
        createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.3
            public void apply() throws Exception {
                SparkRuntimeService.this.spark.onFinish(z, SparkRuntimeService.this.context);
            }
        });
    }

    private TransactionExecutor createTransactionExecutor() {
        return new DefaultTransactionExecutor(this.txClient, this.context.getDatasetInstantiator().getTransactionAware());
    }

    private String[] prepareSparkSubmitArgs(SparkSpecification sparkSpecification, Configuration configuration, Location location, Location location2) throws Exception {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(location2.toURI().getPath());
        ClassLoader classLoader = this.spark.getClass().getClassLoader();
        if (classLoader instanceof URLClassLoader) {
            for (URL url : ((URLClassLoader) classLoader).getURLs()) {
                File absoluteFile = new File(url.toURI()).getAbsoluteFile();
                if (absoluteFile.isFile() && absoluteFile.getName().endsWith(".jar")) {
                    newArrayList.add(absoluteFile.getAbsolutePath());
                }
            }
        }
        return new String[]{"--class", SparkProgramWrapper.class.getCanonicalName(), "--jars", Joiner.on(',').join(newArrayList), "--master", configuration.get("mapreduce.framework.name"), location.toURI().getPath(), sparkSpecification.getMainClassName()};
    }

    private Location buildDependencyJar(BasicSparkContext basicSparkContext, Configuration configuration) throws IOException {
        Id.Program id = basicSparkContext.getProgram().getId();
        Location create = this.locationFactory.create(String.format("%s.%s.%s.%s.%s.jar", ProgramType.SPARK.name().toLowerCase(), id.getNamespaceId(), id.getApplicationId(), id.getId(), basicSparkContext.getRunId().getId()));
        LOG.debug("Creating Spark Job Jar: {}", create.toURI());
        JarOutputStream jarOutputStream = new JarOutputStream(create.getOutputStream());
        try {
            jarOutputStream.putNextEntry(new JarEntry(SPARK_HCONF_FILENAME));
            configuration.writeXml(jarOutputStream);
            Closeables.closeQuietly(jarOutputStream);
            return create;
        } catch (Throwable th) {
            Closeables.closeQuietly(jarOutputStream);
            throw th;
        }
    }

    private Location copyProgramJar(Location location, BasicSparkContext basicSparkContext) throws IOException {
        Id.Program id = basicSparkContext.getProgram().getId();
        Location create = this.locationFactory.create(String.format("%s.%s.%s.%s.%s.program.jar", ProgramType.SPARK.name().toLowerCase(), id.getNamespaceId(), id.getApplicationId(), id.getId(), basicSparkContext.getRunId().getId()));
        ByteStreams.copy(Locations.newInputSupplier(location), Locations.newOutputSupplier(create));
        return create;
    }

    private Runnable createCleanupTask(final Iterable<File> iterable, final Iterable<Location> iterable2) {
        return new Runnable() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.4
            @Override // java.lang.Runnable
            public void run() {
                Iterator it = iterable.iterator();
                while (it.hasNext()) {
                    ((File) it.next()).delete();
                }
                Iterator it2 = iterable2.iterator();
                while (it2.hasNext()) {
                    Locations.deleteQuietly((Location) it2.next());
                }
            }
        };
    }
}
