package co.cask.cdap.datapipeline;

import co.cask.cdap.api.ProgramStatus;
import co.cask.cdap.api.app.ApplicationConfigurer;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.api.dataset.lib.AbstractCloseableIterator;
import co.cask.cdap.api.dataset.lib.PartitionFilter;
import co.cask.cdap.api.metrics.Metrics;
import co.cask.cdap.api.schedule.ProgramStatusTriggerInfo;
import co.cask.cdap.api.schedule.TriggeringScheduleInfo;
import co.cask.cdap.api.workflow.AbstractWorkflow;
import co.cask.cdap.api.workflow.WorkflowContext;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.etl.api.AlertPublisher;
import co.cask.cdap.etl.api.AlertPublisherContext;
import co.cask.cdap.etl.api.Engine;
import co.cask.cdap.etl.api.SplitterTransform;
import co.cask.cdap.etl.api.action.Action;
import co.cask.cdap.etl.api.batch.BatchAggregator;
import co.cask.cdap.etl.api.batch.BatchJoiner;
import co.cask.cdap.etl.api.batch.PostAction;
import co.cask.cdap.etl.api.batch.SparkCompute;
import co.cask.cdap.etl.api.batch.SparkSink;
import co.cask.cdap.etl.api.condition.Condition;
import co.cask.cdap.etl.batch.ActionSpec;
import co.cask.cdap.etl.batch.BatchPhaseSpec;
import co.cask.cdap.etl.batch.BatchPipelineSpec;
import co.cask.cdap.etl.batch.WorkflowBackedActionContext;
import co.cask.cdap.etl.batch.condition.PipelineCondition;
import co.cask.cdap.etl.batch.connector.AlertPublisherSink;
import co.cask.cdap.etl.batch.connector.AlertReader;
import co.cask.cdap.etl.batch.connector.MultiConnectorSource;
import co.cask.cdap.etl.batch.customaction.PipelineAction;
import co.cask.cdap.etl.batch.mapreduce.ETLMapReduce;
import co.cask.cdap.etl.common.BasicArguments;
import co.cask.cdap.etl.common.Constants;
import co.cask.cdap.etl.common.DefaultAlertPublisherContext;
import co.cask.cdap.etl.common.DefaultMacroEvaluator;
import co.cask.cdap.etl.common.DefaultStageMetrics;
import co.cask.cdap.etl.common.LocationAwareMDCWrapperLogger;
import co.cask.cdap.etl.common.PipelinePhase;
import co.cask.cdap.etl.common.PipelineRuntime;
import co.cask.cdap.etl.common.TrackedIterator;
import co.cask.cdap.etl.common.plugin.PipelinePluginContext;
import co.cask.cdap.etl.planner.ConditionBranches;
import co.cask.cdap.etl.planner.ControlDag;
import co.cask.cdap.etl.planner.Dag;
import co.cask.cdap.etl.planner.DisjointConnectionsException;
import co.cask.cdap.etl.planner.PipelinePlan;
import co.cask.cdap.etl.planner.PipelinePlanner;
import co.cask.cdap.etl.proto.Connection;
import co.cask.cdap.etl.proto.v2.ArgumentMapping;
import co.cask.cdap.etl.proto.v2.PluginPropertyMapping;
import co.cask.cdap.etl.proto.v2.TriggeringPropertyMapping;
import co.cask.cdap.etl.spark.batch.ETLSpark;
import co.cask.cdap.etl.spec.StageSpec;
import co.cask.cdap.internal.io.SchemaTypeAdapter;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/datapipeline/SmartWorkflow.class */
public class SmartWorkflow extends AbstractWorkflow {
    public static final String NAME = "DataPipelineWorkflow";
    public static final String DESCRIPTION = "Data Pipeline Workflow";
    public static final String TRIGGERING_PROPERTIES_MAPPING = "triggering.properties.mapping";
    private static final String RESOLVED_PLUGIN_PROPERTIES_MAP = "resolved.plugin.properties.map";
    private static final Logger LOG = LoggerFactory.getLogger(SmartWorkflow.class);
    private static final Logger WRAPPERLOGGER = new LocationAwareMDCWrapperLogger(LOG, Constants.EVENT_TYPE_TAG, Constants.PIPELINE_LIFECYCLE_TAG_VALUE);
    private static final Gson GSON = new GsonBuilder().registerTypeAdapter(Schema.class, new SchemaTypeAdapter()).create();
    private static final Type STAGE_PROPERTIES_MAP = new TypeToken<Map<String, Map<String, String>>>() { // from class: co.cask.cdap.datapipeline.SmartWorkflow.1
    }.getType();
    private final ApplicationConfigurer applicationConfigurer;
    private final Set<String> supportedPluginTypes;
    private final Engine engine;
    private boolean useSpark;
    private PipelinePlan plan;
    private ControlDag dag;
    private Map<String, PostAction> postActions;
    private Map<String, AlertPublisher> alertPublishers;
    private Map<String, StageSpec> stageSpecs;
    private Metrics workflowMetrics;
    private BatchPipelineSpec spec;
    private int connectorNum = 0;
    private int phaseNum = 1;
    private final Map<String, String> connectorDatasets = new HashMap();

    public SmartWorkflow(BatchPipelineSpec batchPipelineSpec, Set<String> set, ApplicationConfigurer applicationConfigurer, Engine engine) {
        this.spec = batchPipelineSpec;
        this.supportedPluginTypes = set;
        this.applicationConfigurer = applicationConfigurer;
        this.engine = engine;
    }

    protected void configure() {
        Dag dag;
        setName(NAME);
        setDescription(DESCRIPTION);
        HashMap hashMap = new HashMap();
        hashMap.put(Constants.PIPELINE_SPEC_KEY, GSON.toJson(this.spec));
        setProperties(hashMap);
        this.stageSpecs = new HashMap();
        this.useSpark = this.engine == Engine.SPARK;
        for (StageSpec stageSpec : this.spec.getStages()) {
            this.stageSpecs.put(stageSpec.getName(), stageSpec);
            String type = stageSpec.getPlugin().getType();
            if (SparkCompute.PLUGIN_TYPE.equals(type) || SparkSink.PLUGIN_TYPE.equals(type)) {
                this.useSpark = true;
            }
        }
        ImmutableSet of = ImmutableSet.of(Action.PLUGIN_TYPE, Constants.SPARK_PROGRAM_PLUGIN_TYPE);
        ImmutableSet of2 = ImmutableSet.of(SplitterTransform.PLUGIN_TYPE);
        this.plan = (this.useSpark ? new PipelinePlanner(this.supportedPluginTypes, ImmutableSet.of(), ImmutableSet.of(), of, of2) : new PipelinePlanner(this.supportedPluginTypes, ImmutableSet.of(BatchAggregator.PLUGIN_TYPE, BatchJoiner.PLUGIN_TYPE), ImmutableSet.of(SparkCompute.PLUGIN_TYPE, SparkSink.PLUGIN_TYPE), of, of2)).plan(this.spec);
        TrunkProgramAdder trunkProgramAdder = new TrunkProgramAdder(getConfigurer());
        if (this.plan.getPhases().size() == 1) {
            addProgram(this.plan.getPhases().keySet().iterator().next(), trunkProgramAdder);
            return;
        }
        if (this.plan.getPhaseConnections().isEmpty()) {
            WorkflowProgramAdder fork = trunkProgramAdder.fork();
            Iterator<String> it = this.plan.getPhases().keySet().iterator();
            while (it.hasNext()) {
                addProgram(it.next(), fork);
            }
            fork.join();
            return;
        }
        this.dag = new ControlDag(this.plan.getPhaseConnections());
        boolean z = false;
        Map<String, ConditionBranches> conditionPhaseBranches = this.plan.getConditionPhaseBranches();
        if (conditionPhaseBranches.isEmpty()) {
            this.dag.flatten();
        } else if (!conditionPhaseBranches.keySet().containsAll(this.dag.getSources())) {
            Set<String> keySet = conditionPhaseBranches.keySet();
            Set<String> accessibleFrom = this.dag.accessibleFrom(this.dag.getSources(), Sets.union(this.dag.getSinks(), keySet));
            Set<String> difference = Sets.difference(accessibleFrom, keySet);
            HashSet hashSet = new HashSet();
            LinkedList linkedList = new LinkedList();
            HashSet hashSet2 = new HashSet();
            if (difference.size() > 1) {
                try {
                    dag = this.dag.createSubDag(difference);
                } catch (DisjointConnectionsException | IllegalArgumentException e) {
                    z = true;
                    HashSet hashSet3 = new HashSet();
                    Iterator<String> it2 = this.dag.getSources().iterator();
                    while (it2.hasNext()) {
                        hashSet3.add(new Connection("dummy", it2.next()));
                    }
                    LinkedList linkedList2 = new LinkedList();
                    linkedList2.addAll(this.dag.getSources());
                    while (linkedList2.peek() != null) {
                        String str = (String) linkedList2.poll();
                        for (String str2 : this.dag.getNodeOutputs(str)) {
                            if (difference.contains(str2)) {
                                hashSet3.add(new Connection(str, str2));
                                linkedList2.add(str2);
                            }
                        }
                    }
                    dag = new Dag(hashSet3);
                }
                ControlDag controlDag = new ControlDag(dag);
                controlDag.flatten();
                linkedList.addAll(controlDag.getSources());
                while (linkedList.peek() != null) {
                    String str3 = (String) linkedList.poll();
                    for (String str4 : controlDag.getNodeOutputs(str3)) {
                        hashSet.add(new Connection(str3, str4));
                        linkedList.add(str4);
                    }
                }
                hashSet2.addAll(controlDag.getSinks());
            } else {
                hashSet2.addAll(difference);
            }
            Iterator<E> it3 = Sets.intersection(accessibleFrom, keySet).iterator();
            while (it3.hasNext()) {
                hashSet.add(new Connection((String) hashSet2.iterator().next(), (String) it3.next()));
            }
            linkedList.addAll(Sets.intersection(accessibleFrom, keySet));
            while (linkedList.peek() != null) {
                String str5 = (String) linkedList.poll();
                ConditionBranches conditionBranches = conditionPhaseBranches.get(str5);
                if (conditionBranches == null) {
                    for (String str6 : this.dag.getNodeOutputs(str5)) {
                        hashSet.add(new Connection(str5, str6));
                        linkedList.add(str6);
                    }
                } else {
                    for (Boolean bool : Arrays.asList(true, false)) {
                        String trueOutput = bool.booleanValue() ? conditionBranches.getTrueOutput() : conditionBranches.getFalseOutput();
                        if (trueOutput != null) {
                            hashSet.add(new Connection(str5, trueOutput, bool));
                            linkedList.add(trueOutput);
                        }
                    }
                }
            }
            this.dag = new ControlDag(hashSet);
        }
        if (!z) {
            addPrograms(this.dag.getSources().iterator().next(), trunkProgramAdder);
            return;
        }
        WorkflowProgramAdder fork2 = trunkProgramAdder.fork();
        Iterator<String> it4 = this.dag.getNodeOutputs(this.dag.getSources().iterator().next()).iterator();
        while (it4.hasNext()) {
            if (!addBranchPrograms(it4.next(), fork2)) {
                fork2 = fork2.also();
            }
        }
    }

    private void updateTokenWithTriggeringProperties(TriggeringScheduleInfo triggeringScheduleInfo, TriggeringPropertyMapping triggeringPropertyMapping, WorkflowToken workflowToken) {
        ArrayList arrayList = new ArrayList();
        for (ProgramStatusTriggerInfo programStatusTriggerInfo : triggeringScheduleInfo.getTriggerInfos()) {
            if (programStatusTriggerInfo instanceof ProgramStatusTriggerInfo) {
                arrayList.add(programStatusTriggerInfo);
            }
        }
        if (arrayList.isEmpty()) {
            return;
        }
        ProgramStatusTriggerInfo programStatusTriggerInfo2 = (ProgramStatusTriggerInfo) arrayList.get(0);
        BasicArguments basicArguments = new BasicArguments(programStatusTriggerInfo2.getWorkflowToken(), programStatusTriggerInfo2.getRuntimeArguments());
        for (ArgumentMapping argumentMapping : triggeringPropertyMapping.getArguments()) {
            String source = argumentMapping.getSource();
            if (source == null) {
                LOG.warn("The name of argument from the triggering pipeline cannot be null, skip this argument mapping: '{}'.", argumentMapping);
            } else {
                String str = basicArguments.get(source);
                if (str == null) {
                    LOG.warn("Runtime argument '{}' is not found in run '{}' of the triggering pipeline '{}' in namespace '{}' ", source, programStatusTriggerInfo2.getRunId(), programStatusTriggerInfo2.getApplicationSpecification().getName(), programStatusTriggerInfo2.getNamespace());
                } else {
                    workflowToken.put(argumentMapping.getTarget() == null ? source : argumentMapping.getTarget(), str);
                }
            }
        }
        Map map = (Map) GSON.fromJson(basicArguments.get(RESOLVED_PLUGIN_PROPERTIES_MAP), STAGE_PROPERTIES_MAP);
        for (PluginPropertyMapping pluginPropertyMapping : triggeringPropertyMapping.getPluginProperties()) {
            String stageName = pluginPropertyMapping.getStageName();
            if (stageName == null) {
                LOG.warn("The name of the stage cannot be null in plugin property mapping, skip this mapping: '{}'.", pluginPropertyMapping);
            } else {
                Map map2 = (Map) map.get(stageName);
                if (map2 == null) {
                    LOG.warn("No plugin properties can be found with stage name '{}' in triggering pipeline '{}' in namespace '{}' ", pluginPropertyMapping.getStageName(), programStatusTriggerInfo2.getApplicationSpecification().getName(), programStatusTriggerInfo2.getNamespace());
                } else {
                    String source2 = pluginPropertyMapping.getSource();
                    if (source2 == null) {
                        LOG.warn("The name of argument from the triggering pipeline cannot be null, skip this argument mapping: '{}'.", pluginPropertyMapping);
                    } else {
                        String str2 = (String) map2.get(source2);
                        if (str2 == null) {
                            LOG.warn("No property with name '{}' can be found in plugin '{}' of the triggering pipeline '{}' in namespace '{}' ", source2, stageName, programStatusTriggerInfo2.getApplicationSpecification().getName(), programStatusTriggerInfo2.getNamespace());
                        } else {
                            workflowToken.put(pluginPropertyMapping.getTarget() == null ? source2 : pluginPropertyMapping.getTarget(), str2);
                        }
                    }
                }
            }
        }
    }

    public void initialize(WorkflowContext workflowContext) throws Exception {
        String str;
        super.initialize(workflowContext);
        TriggeringScheduleInfo triggeringScheduleInfo = workflowContext.getTriggeringScheduleInfo();
        if (triggeringScheduleInfo != null && (str = (String) triggeringScheduleInfo.getProperties().get(TRIGGERING_PROPERTIES_MAPPING)) != null) {
            updateTokenWithTriggeringProperties(triggeringScheduleInfo, (TriggeringPropertyMapping) GSON.fromJson(str, TriggeringPropertyMapping.class), workflowContext.getToken());
        }
        PipelineRuntime pipelineRuntime = new PipelineRuntime(workflowContext, this.workflowMetrics);
        WRAPPERLOGGER.info("Pipeline '{}' is started by user '{}' with arguments {}", workflowContext.getApplicationSpecification().getName(), UserGroupInformation.getCurrentUser().getShortUserName(), pipelineRuntime.getArguments().asMap());
        this.alertPublishers = new HashMap();
        this.postActions = new LinkedHashMap();
        this.spec = (BatchPipelineSpec) GSON.fromJson(workflowContext.getWorkflowSpecification().getProperty(Constants.PIPELINE_SPEC_KEY), BatchPipelineSpec.class);
        this.stageSpecs = new HashMap();
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(pipelineRuntime.getArguments(), workflowContext.getLogicalStartTime(), workflowContext, workflowContext.getNamespace());
        PipelinePluginContext pipelinePluginContext = new PipelinePluginContext(workflowContext, this.workflowMetrics, this.spec.isStageLoggingEnabled(), this.spec.isProcessTimingEnabled());
        for (ActionSpec actionSpec : this.spec.getEndingActions()) {
            String name = actionSpec.getName();
            this.postActions.put(name, (PostAction) pipelinePluginContext.newPluginInstance(name, defaultMacroEvaluator));
            this.stageSpecs.put(name, StageSpec.builder(name, actionSpec.getPluginSpec()).setStageLoggingEnabled(this.spec.isStageLoggingEnabled()).setProcessTimingEnabled(this.spec.isProcessTimingEnabled()).build());
        }
        for (StageSpec stageSpec : this.spec.getStages()) {
            String name2 = stageSpec.getName();
            this.stageSpecs.put(name2, stageSpec);
            if (AlertPublisher.PLUGIN_TYPE.equals(stageSpec.getPluginType())) {
                this.alertPublishers.put(name2, (AlertPublisher) workflowContext.newPluginInstance(name2, defaultMacroEvaluator));
            }
        }
        WRAPPERLOGGER.info("Pipeline '{}' running", workflowContext.getApplicationSpecification().getName());
    }

    public void destroy() {
        WorkflowContext context = getContext();
        PipelineRuntime pipelineRuntime = new PipelineRuntime(context, this.workflowMetrics);
        if (!context.getDataTracer(PostAction.PLUGIN_TYPE).isEnabled()) {
            for (Map.Entry<String, PostAction> entry : this.postActions.entrySet()) {
                String key = entry.getKey();
                try {
                    entry.getValue().run(new WorkflowBackedActionContext(context, pipelineRuntime, this.stageSpecs.get(key)));
                } catch (Throwable th) {
                    LOG.error("Error while running post action {}.", key, th);
                }
            }
        }
        for (Map.Entry<String, AlertPublisher> entry2 : this.alertPublishers.entrySet()) {
            String key2 = entry2.getKey();
            AlertPublisher value = entry2.getValue();
            try {
                try {
                    AbstractCloseableIterator alertReader = new AlertReader(context.getDataset(key2).getPartitions(PartitionFilter.ALWAYS_MATCH));
                    Throwable th2 = null;
                    try {
                        try {
                            if (alertReader.hasNext()) {
                                DefaultStageMetrics defaultStageMetrics = new DefaultStageMetrics(this.workflowMetrics, key2);
                                value.initialize((AlertPublisherContext) new DefaultAlertPublisherContext(pipelineRuntime, this.stageSpecs.get(key2), context, context.getAdmin()));
                                value.publish(new TrackedIterator(alertReader, defaultStageMetrics, Constants.Metrics.RECORDS_IN));
                                if (alertReader != null) {
                                    if (0 != 0) {
                                        try {
                                            alertReader.close();
                                        } catch (Throwable th3) {
                                            th2.addSuppressed(th3);
                                        }
                                    } else {
                                        alertReader.close();
                                    }
                                }
                                try {
                                    value.destroy();
                                } catch (Exception e) {
                                    LOG.warn("Error destroying alert publisher for stage {}", key2, e);
                                }
                            } else {
                                if (alertReader != null) {
                                    if (0 != 0) {
                                        try {
                                            alertReader.close();
                                        } catch (Throwable th4) {
                                            th2.addSuppressed(th4);
                                        }
                                    } else {
                                        alertReader.close();
                                    }
                                }
                                try {
                                    value.destroy();
                                } catch (Exception e2) {
                                    LOG.warn("Error destroying alert publisher for stage {}", key2, e2);
                                }
                            }
                        } catch (Throwable th5) {
                            th2 = th5;
                            throw th5;
                            break;
                        }
                    } catch (Throwable th6) {
                        if (alertReader != null) {
                            if (th2 != null) {
                                try {
                                    alertReader.close();
                                } catch (Throwable th7) {
                                    th2.addSuppressed(th7);
                                }
                            } else {
                                alertReader.close();
                            }
                        }
                        throw th6;
                        break;
                    }
                } catch (Exception e3) {
                    LOG.warn("Stage {} had errors publishing alerts. Alerts may not have been published.", key2, e3);
                    try {
                        value.destroy();
                    } catch (Exception e4) {
                        LOG.warn("Error destroying alert publisher for stage {}", key2, e4);
                    }
                }
            } catch (Throwable th8) {
                try {
                    value.destroy();
                } catch (Exception e5) {
                    LOG.warn("Error destroying alert publisher for stage {}", key2, e5);
                }
                throw th8;
            }
        }
        ProgramStatus status = getContext().getState().getStatus();
        if (status == ProgramStatus.FAILED) {
            WRAPPERLOGGER.error("Pipeline '{}' failed.", getContext().getApplicationSpecification().getName());
        } else {
            WRAPPERLOGGER.info("Pipeline '{}' {}.", getContext().getApplicationSpecification().getName(), status == ProgramStatus.COMPLETED ? "succeeded" : status.name().toLowerCase());
        }
        DefaultMacroEvaluator defaultMacroEvaluator = new DefaultMacroEvaluator(pipelineRuntime.getArguments(), context.getLogicalStartTime(), context, context.getNamespace());
        HashMap hashMap = new HashMap();
        Iterator<StageSpec> it = this.stageSpecs.values().iterator();
        while (it.hasNext()) {
            String name = it.next().getName();
            hashMap.put(name, context.getPluginProperties(name, defaultMacroEvaluator).getProperties());
        }
        context.getToken().put(RESOLVED_PLUGIN_PROPERTIES_MAP, GSON.toJson(hashMap));
    }

    private void addPrograms(String str, WorkflowProgramAdder workflowProgramAdder) {
        WorkflowProgramAdder addProgram = addProgram(str, workflowProgramAdder);
        Set<String> nodeOutputs = this.dag.getNodeOutputs(str);
        if (nodeOutputs.isEmpty()) {
            return;
        }
        ConditionBranches conditionBranches = this.plan.getConditionPhaseBranches().get(str);
        if (conditionBranches != null) {
            addCondition(addProgram, conditionBranches);
            return;
        }
        if (nodeOutputs.size() <= 1) {
            addPrograms(nodeOutputs.iterator().next(), addProgram);
            return;
        }
        WorkflowProgramAdder fork = addProgram.fork();
        Iterator<String> it = nodeOutputs.iterator();
        while (it.hasNext()) {
            if (!addBranchPrograms(it.next(), fork)) {
                fork = fork.also();
            }
        }
    }

    private boolean addBranchPrograms(String str, WorkflowProgramAdder workflowProgramAdder) {
        Set<String> nodeInputs = this.dag.getNodeInputs(str);
        if (this.dag.getNodeInputs(str).size() <= 1) {
            addProgram(str, workflowProgramAdder);
            return addBranchPrograms(this.dag.getNodeOutputs(str).iterator().next(), workflowProgramAdder);
        }
        if (this.dag.visit(str) != nodeInputs.size()) {
            return false;
        }
        addPrograms(str, workflowProgramAdder.join());
        return true;
    }

    private BatchPhaseSpec getPhaseSpec(String str, PipelinePhase pipelinePhase) {
        Iterator<StageSpec> it = pipelinePhase.getStagesOfType(Constants.Connector.PLUGIN_TYPE).iterator();
        while (it.hasNext()) {
            String name = it.next().getName();
            if (this.connectorDatasets.get(name) == null) {
                StringBuilder append = new StringBuilder().append("conn-");
                int i = this.connectorNum;
                this.connectorNum = i + 1;
                String sb = append.append(i).toString();
                this.connectorDatasets.put(name, sb);
                new MultiConnectorSource(sb, null).configure(getConfigurer());
            }
        }
        Iterator<StageSpec> it2 = pipelinePhase.getStagesOfType(AlertPublisher.PLUGIN_TYPE).iterator();
        while (it2.hasNext()) {
            new AlertPublisherSink(it2.next().getName(), null).configure(getConfigurer());
        }
        HashMap hashMap = new HashMap();
        for (StageSpec stageSpec : pipelinePhase.getStagesOfType(Constants.Connector.PLUGIN_TYPE)) {
            hashMap.put(stageSpec.getName(), this.connectorDatasets.get(stageSpec.getName()));
        }
        return new BatchPhaseSpec(str, pipelinePhase, this.spec.getResources(), this.spec.getDriverResources(), this.spec.getClientResources(), this.spec.isStageLoggingEnabled(), this.spec.isProcessTimingEnabled(), hashMap, this.spec.getNumOfRecordsPreview(), this.spec.getProperties(), !this.plan.getConditionPhaseBranches().isEmpty());
    }

    private WorkflowProgramAdder addProgram(String str, WorkflowProgramAdder workflowProgramAdder) {
        PipelinePhase phase = this.plan.getPhase(str);
        if (phase == null) {
            return workflowProgramAdder;
        }
        String str2 = "phase-" + this.phaseNum;
        this.phaseNum++;
        BatchPhaseSpec phaseSpec = getPhaseSpec(str2, phase);
        Set<String> pluginTypes = phaseSpec.getPhase().getPluginTypes();
        if (pluginTypes.contains(Action.PLUGIN_TYPE)) {
            workflowProgramAdder.addAction(new PipelineAction(phaseSpec));
        } else if (pluginTypes.contains(Condition.PLUGIN_TYPE)) {
            workflowProgramAdder = workflowProgramAdder.condition(new PipelineCondition(phaseSpec));
        } else if (pluginTypes.contains(Constants.SPARK_PROGRAM_PLUGIN_TYPE)) {
            this.applicationConfigurer.addSpark(new ExternalSparkProgram(phaseSpec, this.stageSpecs.get(phase.getStagesOfType(Constants.SPARK_PROGRAM_PLUGIN_TYPE).iterator().next().getName())));
            workflowProgramAdder.addSpark(str2);
        } else if (this.useSpark) {
            this.applicationConfigurer.addSpark(new ETLSpark(phaseSpec));
            workflowProgramAdder.addSpark(str2);
        } else {
            this.applicationConfigurer.addMapReduce(new ETLMapReduce(phaseSpec, new HashSet(this.connectorDatasets.values())));
            workflowProgramAdder.addMapReduce(str2);
        }
        return workflowProgramAdder;
    }

    private void addCondition(WorkflowProgramAdder workflowProgramAdder, ConditionBranches conditionBranches) {
        String trueOutput = conditionBranches.getTrueOutput();
        if (trueOutput != null) {
            addPrograms(trueOutput, workflowProgramAdder);
        }
        String falseOutput = conditionBranches.getFalseOutput();
        if (falseOutput != null) {
            workflowProgramAdder.otherwise();
            addPrograms(falseOutput, workflowProgramAdder);
        }
        workflowProgramAdder.end();
    }
}
