package co.cask.cdap.etl.spark.batch;

import co.cask.cdap.api.Admin;
import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.annotation.TransactionControl;
import co.cask.cdap.api.annotation.TransactionPolicy;
import co.cask.cdap.api.data.DatasetContext;
import co.cask.cdap.api.data.batch.InputFormatProvider;
import co.cask.cdap.api.data.batch.OutputFormatProvider;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.spark.AbstractSpark;
import co.cask.cdap.api.spark.SparkClientContext;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.etl.api.StageSubmitterContext;
import co.cask.cdap.etl.api.Transform;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchConfigurable;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.BatchSink;
import co.cask.cdap.etl.api.batch.BatchSinkContext;
import co.cask.cdap.etl.api.batch.BatchSource;
import co.cask.cdap.etl.api.batch.BatchSourceContext;
import co.cask.cdap.etl.api.batch.SparkPluginContext;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.lineage.field.FieldOperation;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.DefaultAggregatorContext;
import co.cask.cdap.etl.batch.DefaultJoinerContext;
import co.cask.cdap.etl.batch.PipelinePluginInstantiator;
import co.cask.cdap.etl.batch.connector.SingleConnectorFactory;
import co.cask.cdap.etl.common.BasicArguments;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.FieldOperationTypeAdapter;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.common.PipelineRuntime;
import co.cask.cdap.etl.common.SetMultimapCodec;
import co.cask.cdap.etl.common.submit.AggregatorContextProvider;
import co.cask.cdap.etl.common.submit.CompositeFinisher;
import co.cask.cdap.etl.common.submit.ContextProvider;
import co.cask.cdap.etl.common.submit.Finisher;
import co.cask.cdap.etl.common.submit.JoinerContextProvider;
import co.cask.cdap.etl.common.submit.SubmitterPlugin;
import co.cask.cdap.etl.spark.plugin.SparkPipelinePluginContext;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.collect.SetMultimap;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.BufferedWriter;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.SparkConf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX WARN: Classes with same name are omitted:
  input_file:lib/cdap-etl-batch-5.1.1.jar:lib/hydrator-spark-core-5.1.1.jar:co/cask/cdap/etl/spark/batch/ETLSpark.class
 */
/* loaded from: input_file:lib/hydrator-spark-core-5.1.1.jar:co/cask/cdap/etl/spark/batch/ETLSpark.class */
public class ETLSpark extends AbstractSpark {
    private static final Logger LOG = LoggerFactory.getLogger(ETLSpark.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).registerTypeAdapter(SetMultimap.class, new SetMultimapCodec()).registerTypeAdapter(DatasetInfo.class, new DatasetInfoTypeAdapter()).registerTypeAdapter(OutputFormatProvider.class, new OutputFormatProviderTypeAdapter()).registerTypeAdapter(InputFormatProvider.class, new InputFormatProviderTypeAdapter()).registerTypeAdapter(FieldOperation.class, new FieldOperationTypeAdapter()).create();
    private final BatchPhaseSpec phaseSpec;
    private Finisher finisher;
    private List<File> cleanupFiles;

    public ETLSpark(BatchPhaseSpec batchPhaseSpec) {
        this.phaseSpec = batchPhaseSpec;
    }

    protected void configure() {
        setName(this.phaseSpec.getPhaseName());
        setDescription(this.phaseSpec.getDescription());
        this.phaseSpec.getPhase().registerPlugins(getConfigurer());
        setMainClass(BatchSparkPipelineDriver.class);
        setExecutorResources(this.phaseSpec.getResources());
        setDriverResources(this.phaseSpec.getDriverResources());
        setClientResources(this.phaseSpec.getClientResources());
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINEID, GSON.toJson(this.phaseSpec, BatchPhaseSpec.class));
        setProperties(hashMap);
    }

    @TransactionPolicy(TransactionControl.EXPLICIT)
    public void initialize() throws Exception {
        final SparkClientContext context = getContext();
        this.cleanupFiles = new ArrayList();
        ArrayList arrayList = new ArrayList();
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.speculation", "false");
        context.setSparkConf(sparkConf);
        BatchPhaseSpec batchPhaseSpec = (BatchPhaseSpec) GSON.fromJson((String) context.getSpecification().getProperties().get(Constants.PIPELINEID), BatchPhaseSpec.class);
        for (Map.Entry<String, String> entry : batchPhaseSpec.getPipelineProperties().entrySet()) {
            sparkConf.set(entry.getKey(), entry.getValue());
        }
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(new BasicArguments(context), context.getLogicalStartTime(), context, context.getNamespace());
        final SparkBatchSourceFactory sparkBatchSourceFactory = new SparkBatchSourceFactory();
        final SparkBatchSinkFactory sparkBatchSinkFactory = new SparkBatchSinkFactory();
        final HashMap hashMap = new HashMap();
        PipelinePluginInstantiator pipelinePluginInstantiator = new PipelinePluginInstantiator(new SparkPipelinePluginContext(context, context.getMetrics(), batchPhaseSpec.isStageLoggingEnabled(), batchPhaseSpec.isProcessTimingEnabled()), context.getMetrics(), batchPhaseSpec, new SingleConnectorFactory());
        final PipelineRuntime pipelineRuntime = new PipelineRuntime(context);
        final Admin admin = context.getAdmin();
        PipelinePhase phase = batchPhaseSpec.getPhase();
        final HashMap hashMap2 = new HashMap();
        for (final String str : phase.getDag().getTopologicalOrder()) {
            final StageSpec stage = phase.getStage(str);
            String pluginType = stage.getPluginType();
            boolean z = Constants.Connector.PLUGIN_TYPE.equals(pluginType) && phase.getSources().contains(str);
            boolean z2 = Constants.Connector.PLUGIN_TYPE.equals(pluginType) && phase.getSinks().contains(str);
            SubmitterPlugin submitterPlugin = null;
            if (BatchSource.PLUGIN_TYPE.equals(pluginType) || z) {
                submitterPlugin = new SubmitterPlugin(str, context, (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new ContextProvider<BatchSourceContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // co.cask.cdap.etl.common.submit.ContextProvider
                    public BatchSourceContext getContext(DatasetContext datasetContext) {
                        return new SparkBatchSourceContext(sparkBatchSourceFactory, context, pipelineRuntime, datasetContext, stage);
                    }
                }, new SubmitterPlugin.PrepareAction<SparkBatchSourceContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.2
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(SparkBatchSourceContext sparkBatchSourceContext) {
                        hashMap2.put(str, sparkBatchSourceContext.getFieldOperations());
                    }
                });
            } else if (Transform.PLUGIN_TYPE.equals(pluginType)) {
                submitterPlugin = new SubmitterPlugin(str, context, (Transform) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new ContextProvider<StageSubmitterContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.3
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // co.cask.cdap.etl.common.submit.ContextProvider
                    public StageSubmitterContext getContext(DatasetContext datasetContext) {
                        return new SparkBatchSourceContext(sparkBatchSourceFactory, context, pipelineRuntime, datasetContext, stage);
                    }
                }, new SubmitterPlugin.PrepareAction<SparkBatchSourceContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.4
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(SparkBatchSourceContext sparkBatchSourceContext) {
                        hashMap2.put(str, sparkBatchSourceContext.getFieldOperations());
                    }
                });
            } else if (BatchSink.PLUGIN_TYPE.equals(pluginType) || z2) {
                submitterPlugin = new SubmitterPlugin(str, context, (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new ContextProvider<BatchSinkContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.5
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // co.cask.cdap.etl.common.submit.ContextProvider
                    public BatchSinkContext getContext(DatasetContext datasetContext) {
                        return new SparkBatchSinkContext(sparkBatchSinkFactory, context, pipelineRuntime, datasetContext, stage);
                    }
                }, new SubmitterPlugin.PrepareAction<SparkBatchSinkContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.6
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(SparkBatchSinkContext sparkBatchSinkContext) {
                        hashMap2.put(str, sparkBatchSinkContext.getFieldOperations());
                    }
                });
            } else if (SparkSink.PLUGIN_TYPE.equals(pluginType)) {
                submitterPlugin = new SubmitterPlugin(str, context, (BatchConfigurable) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new ContextProvider<SparkPluginContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.7
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // co.cask.cdap.etl.common.submit.ContextProvider
                    public SparkPluginContext getContext(DatasetContext datasetContext) {
                        return new BasicSparkPluginContext(context, pipelineRuntime, stage, datasetContext, admin);
                    }
                });
            } else if (BatchAggregator.PLUGIN_TYPE.equals(pluginType)) {
                submitterPlugin = new SubmitterPlugin(str, context, (BatchAggregator) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new AggregatorContextProvider(pipelineRuntime, stage, admin), new SubmitterPlugin.PrepareAction<DefaultAggregatorContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.8
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(DefaultAggregatorContext defaultAggregatorContext) {
                        hashMap2.put(str, defaultAggregatorContext.getFieldOperations());
                    }
                });
            } else if (BatchJoiner.PLUGIN_TYPE.equals(pluginType)) {
                submitterPlugin = new SubmitterPlugin(str, context, (BatchJoiner) pipelinePluginInstantiator.newPluginInstance(str, defaultMacroEvaluator), new JoinerContextProvider(pipelineRuntime, stage, admin), new SubmitterPlugin.PrepareAction<DefaultJoinerContext>() { // from class: co.cask.cdap.etl.spark.batch.ETLSpark.9
                    @Override // co.cask.cdap.etl.common.submit.SubmitterPlugin.PrepareAction
                    public void act(DefaultJoinerContext defaultJoinerContext) {
                        hashMap.put(str, defaultJoinerContext.getNumPartitions());
                        hashMap2.put(str, defaultJoinerContext.getFieldOperations());
                    }
                });
            }
            if (submitterPlugin != null) {
                submitterPlugin.prepareRun();
                arrayList.add(submitterPlugin);
            }
        }
        File createTempFile = File.createTempFile("HydratorSpark", ".config");
        this.cleanupFiles.add(createTempFile);
        BufferedWriter newBufferedWriter = Files.newBufferedWriter(createTempFile.toPath(), StandardCharsets.UTF_8, new OpenOption[0]);
        Throwable th = null;
        try {
            newBufferedWriter.write(GSON.toJson(new SparkBatchSourceSinkFactoryInfo(sparkBatchSourceFactory, sparkBatchSinkFactory, hashMap)));
            if (newBufferedWriter != null) {
                if (0 != 0) {
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    newBufferedWriter.close();
                }
            }
            this.finisher = new CompositeFinisher(arrayList);
            context.localize("HydratorSpark.config", createTempFile.toURI());
            WorkflowToken workflowToken = context.getWorkflowToken();
            if (workflowToken != null) {
                for (Map.Entry<String, String> entry2 : pipelineRuntime.getArguments().getAddedArguments().entrySet()) {
                    workflowToken.put(entry2.getKey(), entry2.getValue());
                }
                workflowToken.put(Constants.FIELD_OPERATION_KEY_IN_WORKFLOW_TOKEN, GSON.toJson(hashMap2));
            }
        } catch (Throwable th3) {
            if (newBufferedWriter != null) {
                if (0 != 0) {
                    try {
                        newBufferedWriter.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    newBufferedWriter.close();
                }
            }
            throw th3;
        }
    }

    @TransactionPolicy(TransactionControl.EXPLICIT)
    public void destroy() {
        if (this.finisher != null) {
            this.finisher.onFinish(getContext().getState().getStatus() == ProgramStatus.COMPLETED);
        }
        for (File file : this.cleanupFiles) {
            if (!file.delete()) {
                LOG.warn("Failed to clean up resource {} ", file);
            }
        }
    }
}
