package co.cask.cdap.datapipeline;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.app.ApplicationConfigurer;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.workflow.AbstractWorkflow;
import co.cask.cdap.api.workflow.WorkflowConfigurer;
import co.cask.cdap.api.workflow.WorkflowContext;
import co.cask.cdap.api.workflow.WorkflowForkConfigurer;
import co.cask.cdap.etl.api.action.Action;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.PostAction;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.batch.ActionSpec;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.BatchPipelineSpec;
import co.cask.cdap.etl.batch.WorkflowBackedActionContext;
import co.cask.cdap.etl.batch.connector.ConnectorSource;
import co.cask.cdap.etl.batch.customaction.PipelineAction;
import co.cask.cdap.etl.batch.mapreduce.ETLMapReduce;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DatasetContextLookupProvider;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.LocationAwareMDCWrapperLogger;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.planner.ControlDag;
import co.cask.cdap.etl.planner.PipelinePlan;
import co.cask.cdap.etl.planner.PipelinePlanner;
import co.cask.cdap.etl.planner.StageInfo;
import co.cask.cdap.etl.proto.Engine;
import co.cask.cdap.etl.spark.batch.ETLSpark;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableSet;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/datapipeline/SmartWorkflow.class */
public class SmartWorkflow extends AbstractWorkflow {
    public static final String NAME = "DataPipelineWorkflow";
    public static final String DESCRIPTION = "Data Pipeline Workflow";
    private static final Logger LOG = LoggerFactory.getLogger(SmartWorkflow.class);
    private static final Logger WRAPPERLOGGER = new LocationAwareMDCWrapperLogger(LOG, Constants.EVENT_TYPE_TAG, Constants.PIPELINE_LIFECYCLE_TAG_VALUE);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private final BatchPipelineSpec spec;
    private final ApplicationConfigurer applicationConfigurer;
    private final Set<String> supportedPluginTypes;
    private final Engine engine;
    private boolean useSpark;
    private PipelinePlan plan;
    private ControlDag dag;
    private Map<String, PostAction> postActions;
    private Map<String, StageSpec> stageSpecs;
    private Metrics workflowMetrics;
    private int connectorNum = 0;
    private int phaseNum = 1;
    private final Map<String, String> connectorDatasets = new HashMap();

    public SmartWorkflow(BatchPipelineSpec batchPipelineSpec, Set<String> set, ApplicationConfigurer applicationConfigurer, Engine engine) {
        this.spec = batchPipelineSpec;
        this.supportedPluginTypes = set;
        this.applicationConfigurer = applicationConfigurer;
        this.engine = engine;
    }

    protected void configure() {
        setName(NAME);
        setDescription(DESCRIPTION);
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINE_SPEC_KEY, GSON.toJson(this.spec));
        setProperties(hashMap);
        this.stageSpecs = new HashMap();
        this.useSpark = this.engine == Engine.SPARK;
        if (!this.useSpark) {
            for (StageSpec stageSpec : this.spec.getStages()) {
                this.stageSpecs.put(stageSpec.getName(), stageSpec);
                String type = stageSpec.getPlugin().getType();
                if (SparkCompute.PLUGIN_TYPE.equals(type) || SparkSink.PLUGIN_TYPE.equals(type)) {
                    this.useSpark = true;
                    break;
                }
            }
        }
        ImmutableSet of = ImmutableSet.of(Action.PLUGIN_TYPE, Constants.SPARK_PROGRAM_PLUGIN_TYPE);
        this.plan = (this.useSpark ? new PipelinePlanner(this.supportedPluginTypes, ImmutableSet.of(), ImmutableSet.of(), of) : new PipelinePlanner(this.supportedPluginTypes, ImmutableSet.of(BatchAggregator.PLUGIN_TYPE, BatchJoiner.PLUGIN_TYPE), ImmutableSet.of(SparkCompute.PLUGIN_TYPE, SparkSink.PLUGIN_TYPE), of)).plan(this.spec);
        if (this.plan.getPhases().size() == 1) {
            addProgram(this.plan.getPhases().keySet().iterator().next(), new TrunkProgramAdder(getConfigurer()));
            return;
        }
        if (!this.plan.getPhaseConnections().isEmpty()) {
            this.dag = new ControlDag(this.plan.getPhaseConnections());
            this.dag.flatten();
            addPrograms(this.dag.getSources().iterator().next(), getConfigurer());
            return;
        }
        WorkflowForkConfigurer fork = getConfigurer().fork();
        BranchProgramAdder branchProgramAdder = new BranchProgramAdder(fork);
        Iterator<String> it = this.plan.getPhases().keySet().iterator();
        while (it.hasNext()) {
            addProgram(it.next(), branchProgramAdder);
        }
        if (fork != null) {
            fork.join();
        }
    }

    public void initialize(WorkflowContext workflowContext) throws Exception {
        super.initialize(workflowContext);
        WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}", workflowContext.getApplicationSpecification().getName(), UserGroupInformation.getCurrentUser().getShortUserName(), Joiner.on(", ").withKeyValueSeparator("=").join(workflowContext.getRuntimeArguments()));
        this.postActions = new LinkedHashMap();
        BatchPipelineSpec batchPipelineSpec = (BatchPipelineSpec) GSON.fromJson(workflowContext.getWorkflowSpecification().getProperty(Constants.PIPELINE_SPEC_KEY), BatchPipelineSpec.class);
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(workflowContext.getToken(), workflowContext.getRuntimeArguments(), workflowContext.getLogicalStartTime(), workflowContext, workflowContext.getNamespace());
        for (ActionSpec actionSpec : batchPipelineSpec.getEndingActions()) {
            this.postActions.put(actionSpec.getName(), (PostAction) workflowContext.newPluginInstance(actionSpec.getName(), defaultMacroEvaluator));
        }
        WRAPPERLOGGER.info("Pipeline '{}' running", workflowContext.getApplicationSpecification().getName());
    }

    public void destroy() {
        WorkflowContext context = getContext();
        if (!context.getDataTracer(PostAction.PLUGIN_TYPE).isEnabled()) {
            DatasetContextLookupProvider datasetContextLookupProvider = new DatasetContextLookupProvider(context);
            Map runtimeArguments = context.getRuntimeArguments();
            long logicalStartTime = context.getLogicalStartTime();
            for (Map.Entry<String, PostAction> entry : this.postActions.entrySet()) {
                String key = entry.getKey();
                try {
                    entry.getValue().run(new WorkflowBackedActionContext(context, this.workflowMetrics, datasetContextLookupProvider, logicalStartTime, runtimeArguments, StageInfo.builder(key, PostAction.PLUGIN_TYPE).build()));
                } catch (Throwable th) {
                    LOG.error("Error while running post action {}.", key, th);
                }
            }
        }
        ProgramStatus status = getContext().getState().getStatus();
        WRAPPERLOGGER.info("Pipeline '{}' {}", getContext().getApplicationSpecification().getName(), status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());
    }

    private void addPrograms(String str, WorkflowConfigurer workflowConfigurer) {
        addProgram(str, new TrunkProgramAdder(workflowConfigurer));
        Set<String> nodeOutputs = this.dag.getNodeOutputs(str);
        if (nodeOutputs.isEmpty()) {
            return;
        }
        if (nodeOutputs.size() <= 1) {
            addPrograms(nodeOutputs.iterator().next(), workflowConfigurer);
            return;
        }
        WorkflowForkConfigurer<? extends WorkflowConfigurer> fork = workflowConfigurer.fork();
        Iterator<String> it = nodeOutputs.iterator();
        while (it.hasNext()) {
            if (!addBranchPrograms(it.next(), fork)) {
                fork = fork.also();
            }
        }
    }

    private boolean addBranchPrograms(String str, WorkflowForkConfigurer<? extends WorkflowConfigurer> workflowForkConfigurer) {
        Set<String> nodeInputs = this.dag.getNodeInputs(str);
        if (this.dag.getNodeInputs(str).size() <= 1) {
            addProgram(str, new BranchProgramAdder(workflowForkConfigurer));
            return addBranchPrograms(this.dag.getNodeOutputs(str).iterator().next(), workflowForkConfigurer);
        }
        if (this.dag.visit(str) != nodeInputs.size()) {
            return false;
        }
        addPrograms(str, (WorkflowConfigurer) workflowForkConfigurer.join());
        return true;
    }

    private void addProgram(String str, WorkflowProgramAdder workflowProgramAdder) {
        PipelinePhase phase = this.plan.getPhase(str);
        if (phase == null) {
            return;
        }
        String str2 = "phase-" + this.phaseNum;
        this.phaseNum++;
        Iterator<StageInfo> it = phase.getStagesOfType(Constants.CONNECTOR_TYPE).iterator();
        while (it.hasNext()) {
            String name = it.next().getName();
            if (this.connectorDatasets.get(name) == null) {
                StringBuilder append = new StringBuilder().append("conn-");
                int i = this.connectorNum;
                this.connectorNum = i + 1;
                String sb = append.append(i).toString();
                this.connectorDatasets.put(name, sb);
                new ConnectorSource(sb, null).configure(getConfigurer());
            }
        }
        HashMap hashMap = new HashMap();
        for (StageInfo stageInfo : phase.getStagesOfType(Constants.CONNECTOR_TYPE)) {
            hashMap.put(stageInfo.getName(), this.connectorDatasets.get(stageInfo.getName()));
        }
        BatchPhaseSpec batchPhaseSpec = new BatchPhaseSpec(str2, phase, this.spec.getResources(), this.spec.getDriverResources(), this.spec.getClientResources(), this.spec.isStageLoggingEnabled(), hashMap, this.spec.getNumOfRecordsPreview());
        Set<String> pluginTypes = batchPhaseSpec.getPhase().getPluginTypes();
        if (pluginTypes.contains(Action.PLUGIN_TYPE)) {
            workflowProgramAdder.addAction(new PipelineAction(batchPhaseSpec));
            return;
        }
        if (pluginTypes.contains(Constants.SPARK_PROGRAM_PLUGIN_TYPE)) {
            this.applicationConfigurer.addSpark(new ExternalSparkProgram(str2, this.stageSpecs.get(phase.getStagesOfType(Constants.SPARK_PROGRAM_PLUGIN_TYPE).iterator().next().getName())));
            workflowProgramAdder.addSpark(str2);
            return;
        }
        if (this.useSpark) {
            this.applicationConfigurer.addSpark(new ETLSpark(batchPhaseSpec));
            workflowProgramAdder.addSpark(str2);
        } else {
            this.applicationConfigurer.addMapReduce(new ETLMapReduce(batchPhaseSpec));
            workflowProgramAdder.addMapReduce(str2);
        }
    }
}
