package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.common.RuntimeArguments;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.queue.QueueSpecification;
import co.cask.cdap.app.queue.QueueSpecificationGenerator;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.security.Impersonator;
import co.cask.cdap.common.twill.AbortOnTimeoutEventHandler;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.queue.SimpleQueueSpecificationGenerator;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.SystemArguments;
import co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner;
import co.cask.cdap.internal.app.runtime.flow.FlowUtils;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ApplicationId;
import co.cask.cdap.proto.id.ProgramId;
import co.cask.cdap.security.TokenSecureStoreUpdater;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
import com.google.inject.Inject;
import java.io.File;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tephra.TransactionExecutorFactory;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillPreparer;
import org.apache.twill.api.TwillRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedFlowProgramRunner.class */
public final class DistributedFlowProgramRunner extends AbstractDistributedProgramRunner {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedFlowProgramRunner.class);
    private final QueueAdmin queueAdmin;
    private final StreamAdmin streamAdmin;
    private final TransactionExecutorFactory txExecutorFactory;
    private final Impersonator impersonator;

    @Inject
    DistributedFlowProgramRunner(TwillRunner twillRunner, YarnConfiguration yarnConfiguration, CConfiguration cConfiguration, QueueAdmin queueAdmin, StreamAdmin streamAdmin, TransactionExecutorFactory transactionExecutorFactory, TokenSecureStoreUpdater tokenSecureStoreUpdater, Impersonator impersonator) {
        super(twillRunner, yarnConfiguration, cConfiguration, tokenSecureStoreUpdater, impersonator);
        this.queueAdmin = queueAdmin;
        this.streamAdmin = streamAdmin;
        this.txExecutorFactory = transactionExecutorFactory;
        this.impersonator = impersonator;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    public void validateOptions(Program program, ProgramOptions programOptions) {
        super.validateOptions(program, programOptions);
        Iterator it = ((FlowSpecification) program.getApplicationSpecification().getFlows().get(program.getName())).getFlowlets().keySet().iterator();
        while (it.hasNext()) {
            SystemArguments.validateTransactionTimeout(programOptions.getUserArguments().asMap(), this.cConf, FlowUtils.FLOWLET_SCOPE, (String) it.next());
        }
    }

    @Override // co.cask.cdap.app.runtime.distributed.DistributedProgramControllerFactory
    public ProgramController createProgramController(TwillController twillController, ProgramDescriptor programDescriptor, RunId runId) {
        return createProgramController(twillController, programDescriptor.getProgramId(), runId, new DistributedFlowletInstanceUpdater(programDescriptor.getProgramId(), twillController, this.queueAdmin, this.streamAdmin, getFlowletQueues(programDescriptor.getProgramId().getParent(), (FlowSpecification) programDescriptor.getSpecification()), this.txExecutorFactory, this.impersonator));
    }

    private ProgramController createProgramController(TwillController twillController, ProgramId programId, RunId runId, DistributedFlowletInstanceUpdater distributedFlowletInstanceUpdater) {
        return new FlowTwillProgramController(programId, twillController, distributedFlowletInstanceUpdater, runId).startListen();
    }

    @Override // co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    protected ProgramController launch(Program program, ProgramOptions programOptions, Map<String, LocalizeResource> map, File file, AbstractDistributedProgramRunner.ApplicationLauncher applicationLauncher) {
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.FLOW, "Only FLOW process type is supported.");
        try {
            FlowSpecification flowSpecification = (FlowSpecification) applicationSpecification.getFlows().get(program.getName());
            Preconditions.checkNotNull(flowSpecification, "Missing FlowSpecification for %s", new Object[]{program.getName()});
            LOG.info("Configuring flowlets queues");
            Multimap<String, QueueName> configureQueue = FlowUtils.configureQueue(program, flowSpecification, this.streamAdmin, this.queueAdmin, this.txExecutorFactory);
            RunId runId = ProgramRunners.getRunId(programOptions);
            LOG.info("Launching distributed flow: {}", program.getId().run(runId));
            TwillController launch = applicationLauncher.launch(new FlowTwillApplication(program, programOptions.getUserArguments(), flowSpecification, map, this.eventHandler));
            return createProgramController(launch, program.getId(), runId, new DistributedFlowletInstanceUpdater(program.getId(), launch, this.queueAdmin, this.streamAdmin, configureQueue, this.txExecutorFactory, this.impersonator));
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    protected EventHandler createEventHandler(CConfiguration cConfiguration) {
        return new AbortOnTimeoutEventHandler(cConfiguration.getLong("twill.no.container.timeout", Long.MAX_VALUE), true);
    }

    @Override // co.cask.cdap.internal.app.runtime.distributed.AbstractDistributedProgramRunner
    protected TwillPreparer setLogLevels(TwillPreparer twillPreparer, Program program, ProgramOptions programOptions) {
        for (String str : ((FlowSpecification) program.getApplicationSpecification().getFlows().get(program.getName())).getFlowlets().keySet()) {
            Map<String, String> logLevels = SystemArguments.getLogLevels(RuntimeArguments.extractScope(FlowUtils.FLOWLET_SCOPE, str, programOptions.getUserArguments().asMap()));
            if (!logLevels.isEmpty()) {
                twillPreparer.setLogLevels(str, transformLogLevels(logLevels));
            }
        }
        return twillPreparer;
    }

    private Multimap<String, QueueName> getFlowletQueues(ApplicationId applicationId, FlowSpecification flowSpecification) {
        Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> create = new SimpleQueueSpecificationGenerator(applicationId).create(flowSpecification);
        ImmutableSetMultimap.Builder builder = ImmutableSetMultimap.builder();
        Iterator it = flowSpecification.getFlowlets().entrySet().iterator();
        while (it.hasNext()) {
            String str = (String) ((Map.Entry) it.next()).getKey();
            Iterator it2 = Iterables.concat(create.column(str).values()).iterator();
            while (it2.hasNext()) {
                builder.put(str, ((QueueSpecification) it2.next()).getQueueName());
            }
        }
        return builder.build();
    }
}
