package co.cask.cdap.etl.batch;

import co.cask.cdap.api.annotation.TransactionControl;
import co.cask.cdap.api.annotation.TransactionPolicy;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.workflow.AbstractWorkflow;
import co.cask.cdap.api.workflow.WorkflowContext;
import co.cask.cdap.etl.api.Engine;
import co.cask.cdap.etl.api.batch.PostAction;
import co.cask.cdap.etl.batch.mapreduce.ETLMapReduce;
import co.cask.cdap.etl.common.BasicArguments;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.PipelineRuntime;
import co.cask.cdap.etl.spark.batch.ETLSpark;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/cdap-etl-batch-4.3.4.jar:co/cask/cdap/etl/batch/ETLWorkflow.class */
public class ETLWorkflow extends AbstractWorkflow {
    public static final String NAME = "ETLWorkflow";
    public static final String DESCRIPTION = "Workflow for ETL Batch MapReduce Driver";
    private static final Logger LOG = LoggerFactory.getLogger(ETLWorkflow.class);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final Engine engine;
    private final BatchPipelineSpec spec;
    private Map<String, PostAction> postActions;
    private Map<String, StageSpec> postActionSpecs;
    private Metrics workflowMetrics;

    public ETLWorkflow(BatchPipelineSpec batchPipelineSpec, Engine engine) {
        this.engine = engine;
        this.spec = batchPipelineSpec;
    }

    protected void configure() {
        setName(NAME);
        setDescription(DESCRIPTION);
        switch (this.engine) {
            case MAPREDUCE:
                addMapReduce(ETLMapReduce.NAME);
                break;
            case SPARK:
                addSpark(ETLSpark.class.getSimpleName());
                break;
        }
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINE_SPEC_KEY, GSON.toJson(this.spec));
        setProperties(hashMap);
    }

    @TransactionPolicy(TransactionControl.EXPLICIT)
    public void initialize(WorkflowContext workflowContext) throws Exception {
        super.initialize(workflowContext);
        this.postActions = new LinkedHashMap();
        BatchPipelineSpec batchPipelineSpec = (BatchPipelineSpec) GSON.fromJson(workflowContext.getWorkflowSpecification().getProperty(Constants.PIPELINE_SPEC_KEY), BatchPipelineSpec.class);
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(new BasicArguments(workflowContext.getToken(), workflowContext.getRuntimeArguments()), workflowContext.getLogicalStartTime(), workflowContext, workflowContext.getNamespace());
        this.postActionSpecs = new HashMap();
        for (ActionSpec actionSpec : batchPipelineSpec.getEndingActions()) {
            String name = actionSpec.getName();
            this.postActions.put(name, (PostAction) workflowContext.newPluginInstance(name, defaultMacroEvaluator));
            this.postActionSpecs.put(name, StageSpec.builder(name, actionSpec.getPluginSpec()).setProcessTimingEnabled(batchPipelineSpec.isProcessTimingEnabled()).setStageLoggingEnabled(batchPipelineSpec.isStageLoggingEnabled()).build());
        }
    }

    @TransactionPolicy(TransactionControl.EXPLICIT)
    public void destroy() {
        WorkflowContext context = getContext();
        PipelineRuntime pipelineRuntime = new PipelineRuntime(context, this.workflowMetrics);
        if (context.getDataTracer(PostAction.PLUGIN_TYPE).isEnabled()) {
            return;
        }
        for (Map.Entry<String, PostAction> entry : this.postActions.entrySet()) {
            String key = entry.getKey();
            try {
                entry.getValue().run(new WorkflowBackedActionContext(context, pipelineRuntime, this.postActionSpecs.get(key)));
            } catch (Throwable th) {
                LOG.error("Error while running ending action {}.", key, th);
            }
        }
    }
}
