package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.api.workflow.ScheduleProgramInfo;
import co.cask.cdap.api.workflow.WorkflowActionNode;
import co.cask.cdap.api.workflow.WorkflowConditionNode;
import co.cask.cdap.api.workflow.WorkflowForkNode;
import co.cask.cdap.api.workflow.WorkflowNode;
import co.cask.cdap.api.workflow.WorkflowNodeType;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.app.ApplicationSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.batch.distributed.MapReduceContainerHelper;
import co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner;
import co.cask.cdap.internal.app.runtime.spark.SparkContextConfig;
import co.cask.cdap.internal.app.runtime.spark.SparkUtils;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.inject.Inject;
import java.io.File;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunner.class */
public final class DistributedWorkflowProgramRunner extends AbstractDistributedProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedWorkflowProgramRunner.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.internal.app.runtime.distributed.DistributedWorkflowProgramRunner$2, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunner$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType = new int[WorkflowNodeType.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[WorkflowNodeType.ACTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[WorkflowNodeType.FORK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[WorkflowNodeType.CONDITION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Inject
    public DistributedWorkflowProgramRunner(TwillRunner twillRunner, Configuration configuration, CConfiguration cConfiguration) {
        super(twillRunner, createConfiguration(configuration), cConfiguration);
    }

    @Override // co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    protected ProgramController launch(Program program, ProgramOptions programOptions, Map<String, LocalizeResource> map, AbstractDistributedProgramRunner.ApplicationLauncher applicationLauncher) {
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.WORKFLOW, "Only WORKFLOW process type is supported.");
        WorkflowSpecification workflowSpecification = applicationSpecification.getWorkflows().get(program.getName());
        Preconditions.checkNotNull(workflowSpecification, "Missing WorkflowSpecification for %s", new Object[]{program.getName()});
        ArrayList arrayList = new ArrayList();
        Resources findSparkDriverResources = findSparkDriverResources(program.getApplicationSpecification().getSpark(), workflowSpecification);
        if (findSparkDriverResources != null) {
            File locateSparkAssemblyJar = SparkUtils.locateSparkAssemblyJar();
            map.put(locateSparkAssemblyJar.getName(), new LocalizeResource(locateSparkAssemblyJar));
            arrayList.add(locateSparkAssemblyJar.getName());
        } else {
            findSparkDriverResources = new Resources();
        }
        arrayList.addAll(MapReduceContainerHelper.localizeFramework(this.hConf, map));
        File file = null;
        if (MapReduceContainerHelper.getFrameworkURI(this.hConf) != null) {
            File absoluteFile = new File(this.cConf.get("local.data.dir"), this.cConf.get("app.temp.dir")).getAbsoluteFile();
            absoluteFile.mkdirs();
            try {
                file = File.createTempFile("launcher", ".jar", absoluteFile);
                MapReduceContainerHelper.saveLauncher(this.hConf, file, arrayList);
                map.put("launcher.jar", new LocalizeResource(file));
            } catch (Exception e) {
                LOG.warn("Failed to create twill container launcher.jar for TWILL-144 hack. Still proceed, but the run will likely fail", e);
            }
        }
        LOG.info("Launching distributed workflow: " + program.getName() + ":" + workflowSpecification.getName());
        TwillController launch = applicationLauncher.launch(new WorkflowTwillApplication(program, workflowSpecification, map, this.eventHandler, findSparkDriverResources), arrayList);
        final File file2 = file;
        Runnable runnable = new Runnable() { // from class: co.cask.cdap.internal.app.runtime.distributed.DistributedWorkflowProgramRunner.1
            @Override // java.lang.Runnable
            public void run() {
                if (file2 != null) {
                    file2.delete();
                }
            }
        };
        launch.onRunning(runnable, Threads.SAME_THREAD_EXECUTOR);
        launch.onTerminated(runnable, Threads.SAME_THREAD_EXECUTOR);
        return new WorkflowTwillProgramController(program.getName(), launch, RunIds.fromString(programOptions.getArguments().getOption(ProgramOptionConstants.RUN_ID))).startListen();
    }

    private static Configuration createConfiguration(Configuration configuration) {
        Configuration configuration2 = new Configuration(configuration);
        configuration2.set(SparkContextConfig.HCONF_ATTR_EXECUTION_MODE, SparkContextConfig.YARN_EXECUTION_MODE);
        return configuration2;
    }

    @Nullable
    private Resources findSparkDriverResources(Map<String, SparkSpecification> map, WorkflowSpecification workflowSpecification) {
        Resources resources = new Resources();
        boolean z = false;
        LinkedList linkedList = new LinkedList(workflowSpecification.getNodes());
        while (!linkedList.isEmpty()) {
            WorkflowForkNode workflowForkNode = (WorkflowNode) linkedList.poll();
            switch (AnonymousClass2.$SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[workflowForkNode.getType().ordinal()]) {
                case 1:
                    ScheduleProgramInfo program = ((WorkflowActionNode) workflowForkNode).getProgram();
                    if (program.getProgramType() != SchedulableProgramType.SPARK) {
                        break;
                    } else {
                        z = true;
                        Resources driverResources = map.get(program.getProgramName()).getDriverResources();
                        if (driverResources == null) {
                            break;
                        } else {
                            resources = max(resources, driverResources);
                            break;
                        }
                    }
                case 2:
                    Iterables.addAll(linkedList, Iterables.concat(workflowForkNode.getBranches()));
                    break;
                case 3:
                    WorkflowConditionNode workflowConditionNode = (WorkflowConditionNode) workflowForkNode;
                    linkedList.addAll(workflowConditionNode.getIfBranch());
                    linkedList.addAll(workflowConditionNode.getElseBranch());
                    break;
                default:
                    LOG.warn("Unknown workflow node type {}", workflowForkNode.getType());
                    break;
            }
        }
        if (z) {
            return resources;
        }
        return null;
    }

    private Resources max(Resources resources, Resources resources2) {
        int memoryMB = resources.getMemoryMB();
        int memoryMB2 = resources2.getMemoryMB();
        int virtualCores = resources.getVirtualCores();
        int virtualCores2 = resources2.getVirtualCores();
        return (memoryMB <= memoryMB2 || virtualCores <= virtualCores2) ? (memoryMB >= memoryMB2 || virtualCores >= virtualCores2) ? new Resources(Math.max(memoryMB, memoryMB2), Math.max(virtualCores, virtualCores2)) : resources2 : resources;
    }
}
