package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.api.spark.SparkSpecification;
import co.cask.cdap.api.workflow.ScheduleProgramInfo;
import co.cask.cdap.api.workflow.WorkflowActionNode;
import co.cask.cdap.api.workflow.WorkflowNodeType;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRuntimeProvider;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.security.Impersonator;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.ProgramRuntimeProviderLoader;
import co.cask.cdap.internal.app.runtime.batch.distributed.MapReduceContainerHelper;
import co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner;
import co.cask.cdap.internal.app.runtime.spark.SparkUtils;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.security.TokenSecureStoreUpdater;
import com.google.common.base.Preconditions;
import com.google.inject.Inject;
import java.io.File;
import java.util.ArrayList;
import java.util.Map;
import org.apache.hadoop.mapred.YarnClientProtocolProvider;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.TwillRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunner.class */
public final class DistributedWorkflowProgramRunner extends AbstractDistributedProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedWorkflowProgramRunner.class);
    private static final String HCONF_ATTR_CLUSTER_MODE = "cdap.spark.cluster.mode";
    private final ProgramRuntimeProviderLoader runtimeProviderLoader;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedWorkflowProgramRunner$DriverMeta.class */
    public static class DriverMeta {
        private final Resources resources;
        private final boolean hasSpark;

        DriverMeta(Resources resources, boolean z) {
            this.resources = resources;
            this.hasSpark = z;
        }
    }

    @Inject
    DistributedWorkflowProgramRunner(TwillRunner twillRunner, YarnConfiguration yarnConfiguration, CConfiguration cConfiguration, TokenSecureStoreUpdater tokenSecureStoreUpdater, ProgramRuntimeProviderLoader programRuntimeProviderLoader, Impersonator impersonator) {
        super(twillRunner, createConfiguration(yarnConfiguration), cConfiguration, tokenSecureStoreUpdater, impersonator);
        this.runtimeProviderLoader = programRuntimeProviderLoader;
    }

    @Override // co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    protected ProgramController launch(Program program, ProgramOptions programOptions, Map<String, LocalizeResource> map, File file, AbstractDistributedProgramRunner.ApplicationLauncher applicationLauncher) {
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.WORKFLOW, "Only WORKFLOW process type is supported.");
        WorkflowSpecification workflowSpecification = (WorkflowSpecification) applicationSpecification.getWorkflows().get(program.getName());
        Preconditions.checkNotNull(workflowSpecification, "Missing WorkflowSpecification for %s", new Object[]{program.getName()});
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(YarnClientProtocolProvider.class);
        DriverMeta findDriverResources = findDriverResources(program.getApplicationSpecification().getSpark(), program.getApplicationSpecification().getMapReduce(), workflowSpecification);
        ProgramRuntimeProvider programRuntimeProvider = this.runtimeProviderLoader.get(ProgramType.SPARK);
        if (programRuntimeProvider != null) {
            try {
                arrayList.add(SparkUtils.prepareSparkResources(file, map));
                arrayList2.add(programRuntimeProvider.getClass());
            } catch (Exception e) {
                if (findDriverResources.hasSpark) {
                    throw e;
                }
                LOG.debug("Spark assembly jar is not present. It doesn't affected Workflow {} since it doesn't use Spark.", program.getId(), e);
            }
        } else if (findDriverResources.hasSpark) {
            throw new IllegalStateException("Missing Spark runtime system. Not able to run Spark program in Workflow.");
        }
        arrayList.addAll(MapReduceContainerHelper.localizeFramework(this.hConf, map));
        LOG.info("Launching distributed workflow: " + program.getName() + ":" + workflowSpecification.getName());
        return new WorkflowTwillProgramController(program.getId(), applicationLauncher.launch(new WorkflowTwillApplication(program, programOptions.getUserArguments(), workflowSpecification, map, this.eventHandler, findDriverResources.resources), arrayList, arrayList2), ProgramRunners.getRunId(programOptions)).startListen();
    }

    private static YarnConfiguration createConfiguration(YarnConfiguration yarnConfiguration) {
        YarnConfiguration yarnConfiguration2 = new YarnConfiguration(yarnConfiguration);
        yarnConfiguration2.setBoolean(HCONF_ATTR_CLUSTER_MODE, true);
        return yarnConfiguration2;
    }

    private DriverMeta findDriverResources(Map<String, SparkSpecification> map, Map<String, MapReduceSpecification> map2, WorkflowSpecification workflowSpecification) {
        ScheduleProgramInfo program;
        SchedulableProgramType programType;
        Resources driverResources;
        Resources resources = new Resources(768);
        boolean z = false;
        for (WorkflowActionNode workflowActionNode : workflowSpecification.getNodeIdMap().values()) {
            if (WorkflowNodeType.ACTION == workflowActionNode.getType() && ((programType = (program = workflowActionNode.getProgram()).getProgramType()) == SchedulableProgramType.SPARK || programType == SchedulableProgramType.MAPREDUCE)) {
                if (programType == SchedulableProgramType.SPARK) {
                    z = true;
                    driverResources = map.get(program.getProgramName()).getClientResources();
                } else {
                    driverResources = map2.get(program.getProgramName()).getDriverResources();
                }
                if (driverResources != null) {
                    resources = max(resources, driverResources);
                }
            }
        }
        return new DriverMeta(resources, z);
    }

    private Resources max(Resources resources, Resources resources2) {
        int memoryMB = resources.getMemoryMB();
        int memoryMB2 = resources2.getMemoryMB();
        int virtualCores = resources.getVirtualCores();
        int virtualCores2 = resources2.getVirtualCores();
        return (memoryMB <= memoryMB2 || virtualCores <= virtualCores2) ? (memoryMB >= memoryMB2 || virtualCores >= virtualCores2) ? new Resources(Math.max(memoryMB, memoryMB2), Math.max(virtualCores, virtualCores2)) : resources2 : resources;
    }
}
