package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletDefinition;
import co.cask.cdap.api.flow.flowlet.FlowletSpecification;
import co.cask.cdap.app.guice.ClusterMode;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.queue.QueueSpecification;
import co.cask.cdap.app.queue.QueueSpecificationGenerator;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.ApplicationNotFoundException;
import co.cask.cdap.common.ProgramNotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.conf.Constants;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.queue.SimpleQueueSpecificationGenerator;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.flow.FlowUtils;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.security.impersonation.Impersonator;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tephra.TransactionExecutorFactory;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedFlowProgramRunner.class */
public final class DistributedFlowProgramRunner extends DistributedProgramRunner implements LongRunningDistributedProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedFlowProgramRunner.class);
    private final QueueAdmin queueAdmin;
    private final StreamAdmin streamAdmin;
    private final Store store;
    private final TransactionExecutorFactory txExecutorFactory;
    private final Impersonator impersonator;

    @Inject
    DistributedFlowProgramRunner(CConfiguration cConfiguration, YarnConfiguration yarnConfiguration, QueueAdmin queueAdmin, Store store, StreamAdmin streamAdmin, TransactionExecutorFactory transactionExecutorFactory, Impersonator impersonator, ClusterMode clusterMode, @Constants.AppFabric.ProgramRunner TwillRunner twillRunner) {
        super(cConfiguration, yarnConfiguration, impersonator, clusterMode, twillRunner);
        this.queueAdmin = queueAdmin;
        this.streamAdmin = streamAdmin;
        this.store = store;
        this.txExecutorFactory = transactionExecutorFactory;
        this.impersonator = impersonator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRunner
    public void validateOptions(Program program, ProgramOptions programOptions) {
        super.validateOptions(program, programOptions);
        Iterator it = ((FlowSpecification) program.getApplicationSpecification().getFlows().get(program.getName())).getFlowlets().keySet().iterator();
        while (it.hasNext()) {
            SystemArguments.validateTransactionTimeout(programOptions.getUserArguments().asMap(), this.cConf, FlowUtils.FLOWLET_SCOPE, (String) it.next());
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRunner
    public ProgramController createProgramController(TwillController twillController, ProgramDescriptor programDescriptor, RunId runId) {
        return new FlowTwillProgramController(programDescriptor.getProgramId(), twillController, new DistributedFlowletInstanceUpdater(programDescriptor.getProgramId(), twillController, this.queueAdmin, this.streamAdmin, getFlowletQueues(programDescriptor.getProgramId().getParent(), (FlowSpecification) programDescriptor.getSpecification()), this.txExecutorFactory, this.impersonator), runId).startListen();
    }

    @Override // co.cask.cdap.app.runtime.ProgramControllerCreator
    public ProgramController createProgramController(TwillController twillController, ProgramId programId, RunId runId) {
        try {
            return createProgramController(twillController, this.store.loadProgram(programId), runId);
        } catch (IOException | ApplicationNotFoundException | ProgramNotFoundException e) {
            throw Throwables.propagate(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRunner
    public void setupLaunchConfig(ProgramLaunchConfig programLaunchConfig, Program program, ProgramOptions programOptions, CConfiguration cConfiguration, Configuration configuration, File file) {
        Map<String, String> asMap = programOptions.getUserArguments().asMap();
        for (Map.Entry entry : getFlowSpecification(program).getFlowlets().entrySet()) {
            FlowletDefinition flowletDefinition = (FlowletDefinition) entry.getValue();
            FlowletSpecification flowletSpec = flowletDefinition.getFlowletSpec();
            String str = (String) entry.getKey();
            programLaunchConfig.addRunnable(str, new FlowletTwillRunnable(str), flowletDefinition.getInstances(), RuntimeArguments.extractScope(FlowUtils.FLOWLET_SCOPE, str, asMap), flowletSpec.getResources());
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRunner
    protected void beforeLaunch(Program program, ProgramOptions programOptions) {
        LOG.info("Configuring flowlets queues");
        FlowUtils.configureQueue(program, getFlowSpecification(program), this.streamAdmin, this.queueAdmin, this.txExecutorFactory);
    }

    private FlowSpecification getFlowSpecification(Program program) {
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.FLOW, "Only FLOW process type is supported.");
        FlowSpecification flowSpecification = (FlowSpecification) applicationSpecification.getFlows().get(program.getName());
        Preconditions.checkNotNull(flowSpecification, "Missing FlowSpecification for %s", new Object[]{program.getName()});
        return flowSpecification;
    }

    private Multimap<String, QueueName> getFlowletQueues(ApplicationId applicationId, FlowSpecification flowSpecification) {
        Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> create = new SimpleQueueSpecificationGenerator(applicationId).create(flowSpecification);
        ImmutableSetMultimap.Builder builder = ImmutableSetMultimap.builder();
        Iterator it = flowSpecification.getFlowlets().entrySet().iterator();
        while (it.hasNext()) {
            String str = (String) ((Map.Entry) it.next()).getKey();
            Iterator it2 = Iterables.concat(create.column(str).values()).iterator();
            while (it2.hasNext()) {
                builder.put(str, ((QueueSpecification) it2.next()).getQueueName());
            }
        }
        return builder.build();
    }
}
