package co.cask.cdap.internal.app.runtime.workflow;

import co.cask.cdap.api.spark.SparkContext;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.internal.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.internal.app.runtime.spark.SparkProgramController;
import com.google.common.base.Preconditions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/SparkProgramWorkflowRunner.class */
public final class SparkProgramWorkflowRunner extends AbstractProgramWorkflowRunner {
    /* JADX INFO: Access modifiers changed from: package-private */
    public SparkProgramWorkflowRunner(WorkflowSpecification workflowSpecification, ProgramRunnerFactory programRunnerFactory, Program program, ProgramOptions programOptions, WorkflowToken workflowToken, String str) {
        super(program, programOptions, programRunnerFactory, workflowSpecification, workflowToken, str);
    }

    @Override // co.cask.cdap.internal.app.runtime.workflow.AbstractProgramWorkflowRunner, co.cask.cdap.internal.app.runtime.workflow.ProgramWorkflowRunner
    public Runnable create(String str) {
        SparkSpecification sparkSpecification = this.workflowProgram.getApplicationSpecification().getSpark().get(str);
        Preconditions.checkArgument(sparkSpecification != null, "No Spark with name %s found in Workflow %s", new Object[]{str, this.workflowSpec.getName()});
        return getProgramRunnable(str, new WorkflowSparkProgram(this.workflowProgram, sparkSpecification));
    }

    @Override // co.cask.cdap.internal.app.runtime.workflow.AbstractProgramWorkflowRunner, co.cask.cdap.internal.app.runtime.workflow.ProgramWorkflowRunner
    public void runAndWait(Program program, ProgramOptions programOptions) throws Exception {
        ProgramController run = this.programRunnerFactory.create(ProgramRunnerFactory.Type.SPARK).run(program, programOptions);
        if (!(run instanceof SparkProgramController)) {
            throw new IllegalStateException("Failed to run program. The controller is not an instance of SparkProgramController");
        }
        SparkContext context = ((SparkProgramController) run).getContext();
        executeProgram(run, context);
        updateWorkflowToken(context);
    }

    private void updateWorkflowToken(SparkContext sparkContext) {
        WorkflowToken workflowToken = sparkContext.getWorkflowToken();
        if (workflowToken == null) {
            throw new IllegalStateException("WorkflowToken cannot be null when the Spark program is started by Workflow.");
        }
        ((BasicWorkflowToken) this.token).mergeToken((BasicWorkflowToken) workflowToken);
    }
}
