package co.cask.cdap.datapipeline;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.plugin.PluginProperties;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.Spark;
import co.cask.cdap.api.spark.SparkClientContext;
import co.cask.cdap.api.spark.SparkMain;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.common.BasicArguments;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.spec.PluginSpec;
import co.cask.cdap.etl.spec.StageSpec;
import com.google.gson.Gson;
import java.util.HashMap;
import java.util.UUID;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/datapipeline/ExternalSparkProgram.class */
public class ExternalSparkProgram extends AbstractSpark {
    public static final String STAGE_NAME = "stage.name";
    static final String PROGRAM_ARGS = "program.args";
    private final BatchPhaseSpec phaseSpec;
    private final StageSpec stageSpec;
    private Spark delegateSpark;
    private static final Logger LOG = LoggerFactory.getLogger(ExternalSparkProgram.class);
    private static final Gson GSON = new Gson();

    public ExternalSparkProgram(BatchPhaseSpec batchPhaseSpec, StageSpec stageSpec) {
        this.phaseSpec = batchPhaseSpec;
        this.stageSpec = stageSpec;
    }

    protected void configure() {
        PluginSpec plugin = this.stageSpec.getPlugin();
        Object usePlugin = usePlugin(plugin.getType(), plugin.getName(), UUID.randomUUID().toString(), PluginProperties.builder().addAll(plugin.getProperties()).build());
        if (usePlugin == null) {
            throw new IllegalStateException(String.format("No plugin found of type %s and name %s for stage %s", plugin.getType(), plugin.getName(), STAGE_NAME));
        }
        if (Spark.class.isAssignableFrom(usePlugin.getClass())) {
            ((Spark) usePlugin).configure(getConfigurer());
        } else if (SparkMain.class.isAssignableFrom(usePlugin.getClass())) {
            setMainClass(ScalaSparkMainWrapper.class);
        } else {
            setMainClass(JavaSparkMainWrapper.class);
        }
        setName(this.phaseSpec.getPhaseName());
        HashMap hashMap = new HashMap();
        hashMap.put(STAGE_NAME, this.stageSpec.getName());
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.phaseSpec, BatchPhaseSpec.class));
        setProperties(hashMap);
    }

    protected void initialize() throws Exception {
        SparkClientContext context = getContext();
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.driver.extraJavaOptions", "-XX:MaxPermSize=256m " + sparkConf.get("spark.driver.extraJavaOptions", ""));
        sparkConf.set("spark.executor.extraJavaOptions", "-XX:MaxPermSize=256m " + sparkConf.get("spark.executor.extraJavaOptions", ""));
        context.setSparkConf(sparkConf);
        String property = context.getSpecification().getProperty(STAGE_NAME);
        if (Spark.class.isAssignableFrom(context.loadPluginClass(property))) {
            this.delegateSpark = (Spark) context.newPluginInstance(property, new DefaultMacroEvaluator(new BasicArguments(context), context.getLogicalStartTime(), context, context.getNamespace()));
            if (this.delegateSpark instanceof AbstractSpark) {
                this.delegateSpark.initialize(context);
            } else {
                this.delegateSpark.beforeSubmit(context);
            }
        }
    }

    public void destroy() {
        if (this.delegateSpark != null) {
            if (this.delegateSpark instanceof AbstractSpark) {
                this.delegateSpark.destroy();
                return;
            }
            try {
                this.delegateSpark.onFinish(getContext().getState().getStatus() == ProgramStatus.COMPLETED, getContext());
            } catch (Exception e) {
                LOG.warn("Exception on calling onFinish on {}", this.delegateSpark.getClass(), e);
            }
        }
    }
}
