package co.cask.cdap.internal.app.runtime.spark;

import co.cask.cdap.api.spark.Spark;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.app.program.ManifestFields;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.lang.CombineClassLoader;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.internal.app.runtime.spark.dataset.SparkDatasetInputFormat;
import co.cask.cdap.internal.app.runtime.spark.dataset.SparkDatasetOutputFormat;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.DefaultTransactionExecutor;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.inject.ProvisionException;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.Arrays;
import java.util.HashSet;
import java.util.concurrent.Executor;
import java.util.jar.JarEntry;
import java.util.jar.JarInputStream;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.deploy.SparkSubmit;
import org.apache.twill.filesystem.Location;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.internal.ApplicationBundler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/spark/SparkRuntimeService.class */
final class SparkRuntimeService extends AbstractExecutionThreadService {
    static final String SPARK_HCONF_FILENAME = "spark_hconf.xml";
    private static final Logger LOG = LoggerFactory.getLogger(SparkRuntimeService.class);
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final Spark spark;
    private final SparkSpecification sparkSpecification;
    private final Location programJarLocation;
    private final BasicSparkContext context;
    private final LocationFactory locationFactory;
    private final TransactionSystemClient txClient;
    private Transaction transaction;
    private Runnable cleanupTask;
    private Configuration sparkHConf;
    private String[] sparkSubmitArgs;
    private volatile boolean stopRequested;

    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkRuntimeService(CConfiguration cConfiguration, Configuration configuration, Spark spark, SparkSpecification sparkSpecification, BasicSparkContext basicSparkContext, Location location, LocationFactory locationFactory, TransactionSystemClient transactionSystemClient) {
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.spark = spark;
        this.sparkSpecification = sparkSpecification;
        this.programJarLocation = location;
        this.context = basicSparkContext;
        this.locationFactory = locationFactory;
        this.txClient = transactionSystemClient;
    }

    protected String getServiceName() {
        return "Spark - " + this.sparkSpecification.getName();
    }

    protected void startUp() throws Exception {
        RuntimeException propagate;
        RuntimeException propagate2;
        this.sparkHConf = new Configuration(this.hConf);
        this.sparkHConf.setClassLoader(new CombineClassLoader((ClassLoader) Objects.firstNonNull(Thread.currentThread().getContextClassLoader(), ClassLoader.getSystemClassLoader()), ImmutableList.of(this.context.getProgram().getClassLoader())));
        beforeSubmit();
        try {
            Location copyProgramJar = copyProgramJar(this.programJarLocation, this.context);
            try {
                Transaction startLong = this.txClient.startLong();
                try {
                    SparkContextConfig.set(this.sparkHConf, this.context, this.cConf, startLong, copyProgramJar);
                    Location buildDependencyJar = buildDependencyJar(this.context, SparkContextConfig.getHConf());
                    try {
                        this.sparkSubmitArgs = prepareSparkSubmitArgs(this.sparkSpecification, this.sparkHConf, copyProgramJar, buildDependencyJar);
                        LOG.info("Submitting Spark program: {} with arguments {}", this.context, Arrays.toString(this.sparkSubmitArgs));
                        this.transaction = startLong;
                        this.cleanupTask = createCleanupTask(buildDependencyJar, copyProgramJar);
                    } finally {
                    }
                } finally {
                }
            } catch (Throwable th) {
                Locations.deleteQuietly(copyProgramJar);
                throw Throwables.propagate(th);
            }
        } catch (Throwable th2) {
            LOG.error("Exception while preparing for submitting Spark Job: {}", this.context, th2);
            throw Throwables.propagate(th2);
        }
    }

    protected void run() throws Exception {
        try {
            ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
            Thread.currentThread().setContextClassLoader(this.sparkHConf.getClassLoader());
            try {
                try {
                    SparkProgramWrapper.setSparkProgramRunning(true);
                    SparkSubmit.main(this.sparkSubmitArgs);
                    SparkProgramWrapper.setSparkProgramRunning(false);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                } catch (Exception e) {
                    LOG.error("Failed to submit Spark program {}", this.context, e);
                    SparkProgramWrapper.setSparkProgramRunning(false);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                }
                if (this.stopRequested) {
                    return;
                }
                Preconditions.checkState(!SparkProgramWrapper.isSparkProgramRunning() && SparkProgramWrapper.isSparkProgramSuccessful(), "Spark program execution failed.");
            } catch (Throwable th) {
                SparkProgramWrapper.setSparkProgramRunning(false);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                throw th;
            }
        } catch (Exception e2) {
            LOG.warn("Failed to set the classloader for submitting spark program");
            throw Throwables.propagate(e2);
        }
    }

    protected void shutDown() throws Exception {
        boolean isSparkProgramSuccessful = SparkProgramWrapper.isSparkProgramSuccessful();
        try {
            if (isSparkProgramSuccessful) {
                LOG.info("Committing Spark Program transaction: {}", this.context);
                if (!this.txClient.commit(this.transaction)) {
                    LOG.warn("Spark Job transaction failed to commit");
                    throw new TransactionFailureException("Failed to commit transaction for Spark " + this.context.toString());
                }
            } else {
                this.txClient.invalidate(this.transaction.getWritePointer());
            }
            try {
                onFinish(isSparkProgramSuccessful);
                this.context.close();
                this.cleanupTask.run();
            } finally {
            }
        } catch (Throwable th) {
            try {
                onFinish(isSparkProgramSuccessful);
                this.context.close();
                this.cleanupTask.run();
                throw th;
            } finally {
            }
        }
    }

    protected void triggerShutdown() {
        try {
            this.stopRequested = true;
            if (SparkProgramWrapper.isSparkProgramRunning()) {
                SparkProgramWrapper.stopSparkProgram();
            }
        } catch (Exception e) {
            LOG.error("Failed to stop Spark job {}", this.sparkSpecification.getName(), e);
            throw Throwables.propagate(e);
        }
    }

    protected Executor executor() {
        return new Executor() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.1
            @Override // java.util.concurrent.Executor
            public void execute(final Runnable runnable) {
                Thread thread = new Thread(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.1.1
                    @Override // java.lang.Runnable
                    public void run() {
                        LoggingContextAccessor.setLoggingContext(SparkRuntimeService.this.context.getLoggingContext());
                        runnable.run();
                    }
                });
                thread.setDaemon(true);
                thread.setName(SparkRuntimeService.this.getServiceName());
                thread.start();
            }
        };
    }

    private void beforeSubmit() throws TransactionFailureException, InterruptedException {
        createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.2
            public void apply() throws Exception {
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                Thread.currentThread().setContextClassLoader(SparkRuntimeService.this.spark.getClass().getClassLoader());
                try {
                    SparkRuntimeService.this.spark.beforeSubmit(SparkRuntimeService.this.context);
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                } catch (Throwable th) {
                    Thread.currentThread().setContextClassLoader(contextClassLoader);
                    throw th;
                }
            }
        });
    }

    private void onFinish(final boolean z) throws TransactionFailureException, InterruptedException {
        createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.3
            public void apply() throws Exception {
                SparkRuntimeService.this.spark.onFinish(z, SparkRuntimeService.this.context);
            }
        });
    }

    private TransactionExecutor createTransactionExecutor() {
        return new DefaultTransactionExecutor(this.txClient, this.context.getDatasetInstantiator().getTransactionAware());
    }

    private String[] prepareSparkSubmitArgs(SparkSpecification sparkSpecification, Configuration configuration, Location location, Location location2) {
        return new String[]{"--class", SparkProgramWrapper.class.getCanonicalName(), "--jars", location2.toURI().getPath(), "--master", configuration.get("mapreduce.framework.name"), location.toURI().getPath(), sparkSpecification.getMainClassName()};
    }

    private Location buildDependencyJar(BasicSparkContext basicSparkContext, Configuration configuration) throws IOException {
        ApplicationBundler applicationBundler = new ApplicationBundler(Lists.newArrayList(new String[]{"org.apache.hadoop", "org.apache.spark"}), Lists.newArrayList(new String[]{"org.apache.hadoop.hbase", "org.apache.hadoop.hive"}));
        Id.Program id = basicSparkContext.getProgram().getId();
        Location create = this.locationFactory.create(String.format("%s.%s.%s.%s.%s_temp.jar", ProgramType.SPARK.name().toLowerCase(), id.getAccountId(), id.getApplicationId(), id.getId(), basicSparkContext.getRunId().getId()));
        LOG.debug("Creating Spark Job Dependency jar: {}", create.toURI());
        URI writeHConf = writeHConf(basicSparkContext, configuration);
        try {
            try {
                HashSet newHashSet = Sets.newHashSet();
                HashSet newHashSet2 = Sets.newHashSet();
                newHashSet.add(Spark.class);
                newHashSet.add(SparkDatasetInputFormat.class);
                newHashSet.add(SparkDatasetOutputFormat.class);
                newHashSet.add(SparkProgramWrapper.class);
                newHashSet.add(JavaSparkContext.class);
                newHashSet.add(ScalaSparkContext.class);
                newHashSet2.add(writeHConf);
                try {
                    newHashSet.add(((HBaseTableUtil) new HBaseTableUtilFactory().get()).getClass());
                } catch (ProvisionException e) {
                    LOG.warn("Not including HBaseTableUtil classes in submitted Job Jar since they are not available");
                }
                ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
                Thread.currentThread().setContextClassLoader(configuration.getClassLoader());
                applicationBundler.createBundle(create, newHashSet, newHashSet2);
                Thread.currentThread().setContextClassLoader(contextClassLoader);
                deleteHConfDir(writeHConf);
                return updateDependencyJar(create, basicSparkContext);
            } catch (Exception e2) {
                throw Throwables.propagate(e2);
            }
        } catch (Throwable th) {
            deleteHConfDir(writeHConf);
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    private Location updateDependencyJar(Location location, BasicSparkContext basicSparkContext) throws IOException {
        String[] strArr = {"classes/", "lib/", "resources/"};
        Id.Program id = basicSparkContext.getProgram().getId();
        Location create = this.locationFactory.create(String.format("%s.%s.%s.%s.%s.jar", ProgramType.SPARK.name().toLowerCase(), id.getAccountId(), id.getApplicationId(), id.getId(), basicSparkContext.getRunId().getId()));
        Manifest manifest = new Manifest();
        manifest.getMainAttributes().put(ManifestFields.MANIFEST_VERSION, ManifestFields.VERSION);
        JarOutputStream jarOutputStream = new JarOutputStream(create.getOutputStream(), manifest);
        try {
            JarInputStream jarInputStream = new JarInputStream(location.getInputStream());
            try {
                for (JarEntry nextJarEntry = jarInputStream.getNextJarEntry(); nextJarEntry != null; nextJarEntry = jarInputStream.getNextJarEntry()) {
                    boolean isDirectory = nextJarEntry.isDirectory();
                    String name = nextJarEntry.getName();
                    String str = name;
                    for (String str2 : strArr) {
                        if (name.startsWith(str2) && !name.equals(str2)) {
                            str = name.substring(str2.length());
                        }
                    }
                    jarOutputStream.putNextEntry(new JarEntry(str));
                    if (!isDirectory) {
                        ByteStreams.copy(jarInputStream, jarOutputStream);
                    }
                }
                jarInputStream.close();
                Locations.deleteQuietly(location);
                return create;
            } catch (Throwable th) {
                jarInputStream.close();
                Locations.deleteQuietly(location);
                throw th;
            }
        } finally {
            jarOutputStream.close();
        }
    }

    private URI writeHConf(BasicSparkContext basicSparkContext, Configuration configuration) {
        Location create = this.locationFactory.create(String.format("%s%s/%s.%s/%s", ProgramType.SPARK.name().toLowerCase(), ".tmp", basicSparkContext.getProgram().getId().getId(), basicSparkContext.getRunId().getId(), SPARK_HCONF_FILENAME));
        BufferedOutputStream bufferedOutputStream = null;
        try {
            try {
                bufferedOutputStream = new BufferedOutputStream(create.getOutputStream());
                configuration.writeXml(bufferedOutputStream);
                Closeables.closeQuietly(bufferedOutputStream);
                LOG.info("Hadoop Configuration stored at {} ", create.toURI());
                return create.toURI();
            } catch (IOException e) {
                LOG.error("Failed to write Hadoop Configuration file locally at {}", create.toURI(), e);
                throw Throwables.propagate(e);
            }
        } catch (Throwable th) {
            Closeables.closeQuietly(bufferedOutputStream);
            throw th;
        }
    }

    private void deleteHConfDir(URI uri) {
        try {
            FileUtils.deleteDirectory(new File(new URI(uri.toString().substring(0, uri.toString().lastIndexOf("/")))));
        } catch (Exception e) {
            LOG.warn("Failed to delete the local hadoop configuration");
        }
    }

    private Location copyProgramJar(Location location, BasicSparkContext basicSparkContext) throws IOException {
        Id.Program id = basicSparkContext.getProgram().getId();
        Location create = this.locationFactory.create(String.format("%s.%s.%s.%s.%s.program.jar", ProgramType.SPARK.name().toLowerCase(), id.getAccountId(), id.getApplicationId(), id.getId(), basicSparkContext.getRunId().getId()));
        ByteStreams.copy(Locations.newInputSupplier(location), Locations.newOutputSupplier(create));
        return create;
    }

    private Runnable createCleanupTask(final Location... locationArr) {
        return new Runnable() { // from class: co.cask.cdap.internal.app.runtime.spark.SparkRuntimeService.4
            @Override // java.lang.Runnable
            public void run() {
                for (Location location : locationArr) {
                    Locations.deleteQuietly(location);
                }
            }
        };
    }
}
