package co.cask.cdap.etl.batch.spark;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.TimePartitionedFileSet;
import co.cask.cdap.api.plugin.PluginContext;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkContext;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.BatchContext;
import co.cask.cdap.etl.batch.config.ETLBatchConfig;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.common.Pipeline;
import co.cask.cdap.etl.common.PipelineRegisterer;
import co.cask.cdap.etl.common.SinkInfo;
import com.google.gson.Gson;
import java.io.File;
import java.io.FileOutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/etl/batch/spark/ETLSpark.class */
public class ETLSpark extends AbstractSpark {
    private static final Logger LOG = LoggerFactory.getLogger(ETLSpark.class);
    private static final Gson GSON = new Gson();
    private final ETLBatchConfig config;
    private List<Finisher> finishers;
    private List<File> cleanupFiles;

    /* loaded from: input_file:co/cask/cdap/etl/batch/spark/ETLSpark$Finisher.class */
    private interface Finisher {
        void onFinish(boolean z);
    }

    public ETLSpark(ETLBatchConfig eTLBatchConfig) {
        this.config = eTLBatchConfig;
    }

    protected void configure() {
        setDescription("Spark Driver for ETL Batch Applications");
        setMainClass(ETLSparkProgram.class);
        Pipeline registerPlugins = new PipelineRegisterer(getConfigurer(), "batch").registerPlugins(this.config, TimePartitionedFileSet.class, FileSetProperties.builder().setInputFormat(AvroKeyInputFormat.class).setOutputFormat(AvroKeyOutputFormat.class).setEnableExploreOnCreate(true).setSerDe("org.apache.hadoop.hive.serde2.avro.AvroSerDe").setExploreInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat").setExploreOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat").setTableProperty("avro.schema.literal", Constants.ERROR_SCHEMA.toString()).build(), true);
        Resources resources = this.config.getResources();
        if (resources != null) {
            setExecutorResources(resources);
        }
        Resources driverResources = this.config.getDriverResources();
        if (driverResources != null) {
            setDriverResources(driverResources);
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(registerPlugins));
        setProperties(hashMap);
    }

    public void beforeSubmit(SparkContext sparkContext) throws Exception {
        this.cleanupFiles = new ArrayList();
        this.finishers = new ArrayList();
        Pipeline pipeline = (Pipeline) GSON.fromJson((String) sparkContext.getSpecification().getProperties().get(Constants.PIPELINEID), Pipeline.class);
        String source = pipeline.getSource();
        PluginContext pluginContext = sparkContext.getPluginContext();
        BatchConfigurable batchConfigurable = (BatchConfigurable) pluginContext.newPluginInstance(source);
        SparkBatchSourceContext sparkBatchSourceContext = new SparkBatchSourceContext(sparkContext, new DatasetContextLookupProvider(sparkContext), source);
        batchConfigurable.prepareRun(sparkBatchSourceContext);
        SparkBatchSourceFactory sourceFactory = sparkBatchSourceContext.getSourceFactory();
        if (sourceFactory == null) {
            throw new IllegalArgumentException("No input was set. Please make sure the source plugin calls setInput when preparing the run.");
        }
        addFinisher(batchConfigurable, sparkBatchSourceContext, this.finishers);
        SparkBatchSinkFactory sparkBatchSinkFactory = new SparkBatchSinkFactory();
        for (SinkInfo sinkInfo : pipeline.getSinks()) {
            BatchConfigurable batchConfigurable2 = (BatchConfigurable) pluginContext.newPluginInstance(sinkInfo.getSinkId());
            SparkBatchSinkContext sparkBatchSinkContext = new SparkBatchSinkContext(sparkBatchSinkFactory, sparkContext, null, sinkInfo.getSinkId());
            batchConfigurable2.prepareRun(sparkBatchSinkContext);
            addFinisher(batchConfigurable2, sparkBatchSinkContext, this.finishers);
        }
        File createTempFile = File.createTempFile("ETLSpark", ".config");
        this.cleanupFiles.add(createTempFile);
        FileOutputStream fileOutputStream = new FileOutputStream(createTempFile);
        Throwable th = null;
        try {
            try {
                sourceFactory.serialize(fileOutputStream);
                sparkBatchSinkFactory.serialize(fileOutputStream);
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                sparkContext.localize("ETLSpark.config", createTempFile.toURI());
            } finally {
            }
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }

    public void onFinish(boolean z, SparkContext sparkContext) throws Exception {
        Iterator<Finisher> it = this.finishers.iterator();
        while (it.hasNext()) {
            it.next().onFinish(z);
        }
        for (File file : this.cleanupFiles) {
            if (!file.delete()) {
                LOG.warn("Failed to clean up resource {} ", file);
            }
        }
    }

    private <T extends BatchContext> void addFinisher(final BatchConfigurable<T> batchConfigurable, final T t, List<Finisher> list) {
        list.add(new Finisher() { // from class: co.cask.cdap.etl.batch.spark.ETLSpark.1
            @Override // co.cask.cdap.etl.batch.spark.ETLSpark.Finisher
            public void onFinish(boolean z) {
                batchConfigurable.onRunFinish(z, t);
            }
        });
    }
}
