package co.cask.cdap.internal.app.runtime.batch;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.mapreduce.MapReduce;
import co.cask.cdap.api.mapreduce.MapReduceSpecification;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.security.store.SecureStore;
import co.cask.cdap.api.security.store.SecureStoreManager;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.lang.InstantiatorFactory;
import co.cask.cdap.common.lang.PropertyFieldSetter;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.data.ProgramContextAware;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.runtime.AbstractProgramRunnerWithPlugin;
import co.cask.cdap.internal.app.runtime.BasicProgramContext;
import co.cask.cdap.internal.app.runtime.DataSetFieldSetter;
import co.cask.cdap.internal.app.runtime.MetricsFieldSetter;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.plugin.PluginInstantiator;
import co.cask.cdap.internal.app.runtime.workflow.NameMappedDatasetFramework;
import co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramInfo;
import co.cask.cdap.internal.lang.Reflections;
import co.cask.cdap.internal.lang.Visitor;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.security.spi.authentication.AuthenticationContext;
import co.cask.cdap.security.spi.authorization.AuthorizationEnforcer;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
import com.google.common.reflect.TypeToken;
import com.google.inject.Inject;
import com.google.inject.Injector;
import java.io.File;
import java.util.ArrayList;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/batch/MapReduceProgramRunner.class */
public class MapReduceProgramRunner extends AbstractProgramRunnerWithPlugin {
    private static final Logger LOG = LoggerFactory.getLogger(MapReduceProgramRunner.class);
    private final Injector injector;
    private final StreamAdmin streamAdmin;
    private final CConfiguration cConf;
    private final Configuration hConf;
    private final NamespacedLocationFactory locationFactory;
    private final MetricsCollectionService metricsCollectionService;
    private final DatasetFramework datasetFramework;
    private final TransactionSystemClient txSystemClient;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final SecureStore secureStore;
    private final SecureStoreManager secureStoreManager;
    private final AuthorizationEnforcer authorizationEnforcer;
    private final AuthenticationContext authenticationContext;
    private final MessagingService messagingService;

    @Inject
    public MapReduceProgramRunner(Injector injector, CConfiguration cConfiguration, Configuration configuration, NamespacedLocationFactory namespacedLocationFactory, StreamAdmin streamAdmin, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient, MetricsCollectionService metricsCollectionService, DiscoveryServiceClient discoveryServiceClient, SecureStore secureStore, SecureStoreManager secureStoreManager, AuthorizationEnforcer authorizationEnforcer, AuthenticationContext authenticationContext, MessagingService messagingService) {
        super(cConfiguration);
        this.injector = injector;
        this.cConf = cConfiguration;
        this.hConf = configuration;
        this.locationFactory = namespacedLocationFactory;
        this.streamAdmin = streamAdmin;
        this.metricsCollectionService = metricsCollectionService;
        this.datasetFramework = datasetFramework;
        this.txSystemClient = transactionSystemClient;
        this.discoveryServiceClient = discoveryServiceClient;
        this.secureStore = secureStore;
        this.secureStoreManager = secureStoreManager;
        this.authorizationEnforcer = authorizationEnforcer;
        this.authenticationContext = authenticationContext;
        this.messagingService = messagingService;
    }

    @Override // co.cask.cdap.app.runtime.ProgramRunner
    public ProgramController run(Program program, ProgramOptions programOptions) {
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.MAPREDUCE, "Only MAPREDUCE process type is supported.");
        MapReduceSpecification mapReduceSpecification = (MapReduceSpecification) applicationSpecification.getMapReduce().get(program.getName());
        Preconditions.checkNotNull(mapReduceSpecification, "Missing MapReduceSpecification for %s", new Object[]{program.getName()});
        Arguments arguments = programOptions.getArguments();
        RunId runId = ProgramRunners.getRunId(programOptions);
        WorkflowProgramInfo create = WorkflowProgramInfo.create(arguments);
        DatasetFramework createFromWorkflowProgramInfo = create == null ? this.datasetFramework : NameMappedDatasetFramework.createFromWorkflowProgramInfo(this.datasetFramework, create, applicationSpecification);
        if (createFromWorkflowProgramInfo instanceof ProgramContextAware) {
            ((ProgramContextAware) createFromWorkflowProgramInfo).setContext(new BasicProgramContext(program.getId().run(runId)));
        }
        try {
            MapReduce mapReduce = (MapReduce) new InstantiatorFactory(false).get(TypeToken.of(program.getMainClass())).create();
            ArrayList arrayList = new ArrayList();
            try {
                PluginInstantiator createPluginInstantiator = createPluginInstantiator(programOptions, program.getClassLoader());
                if (createPluginInstantiator != null) {
                    arrayList.add(createPluginInstantiator);
                }
                BasicMapReduceContext basicMapReduceContext = new BasicMapReduceContext(program, programOptions, this.cConf, mapReduceSpecification, create, this.discoveryServiceClient, this.metricsCollectionService, this.txSystemClient, createFromWorkflowProgramInfo, this.streamAdmin, getPluginArchive(programOptions), createPluginInstantiator, this.secureStore, this.secureStoreManager, this.messagingService);
                arrayList.add(basicMapReduceContext);
                Reflections.visit(mapReduce, mapReduce.getClass(), new PropertyFieldSetter(basicMapReduceContext.getSpecification().getProperties()), new Visitor[]{new MetricsFieldSetter(basicMapReduceContext.getMetrics()), new DataSetFieldSetter(basicMapReduceContext)});
                LoggingContextAccessor.setLoggingContext(basicMapReduceContext.getLoggingContext());
                Configuration configuration = new Configuration(this.hConf);
                String option = programOptions.getArguments().getOption("apps.scheduler.queue");
                if (option != null && !option.isEmpty()) {
                    configuration.set("mapreduce.job.queuename", option);
                }
                MapReduceRuntimeService mapReduceRuntimeService = new MapReduceRuntimeService(this.injector, this.cConf, configuration, mapReduce, mapReduceSpecification, basicMapReduceContext, program.getJarLocation(), this.locationFactory, this.streamAdmin, this.authorizationEnforcer, this.authenticationContext);
                mapReduceRuntimeService.addListener(createRuntimeServiceListener(arrayList), Threads.SAME_THREAD_EXECUTOR);
                MapReduceProgramController mapReduceProgramController = new MapReduceProgramController(mapReduceRuntimeService, basicMapReduceContext);
                LOG.debug("Starting MapReduce Job: {}", basicMapReduceContext);
                if (MapReduceTaskContextProvider.isLocal(configuration) || UserGroupInformation.isSecurityEnabled()) {
                    mapReduceRuntimeService.start();
                } else {
                    ProgramRunners.startAsUser(this.cConf.get("hdfs.user"), mapReduceRuntimeService);
                }
                return mapReduceProgramController;
            } catch (Exception e) {
                closeAllQuietly(arrayList);
                throw Throwables.propagate(e);
            }
        } catch (Exception e2) {
            LOG.error("Failed to instantiate MapReduce class for {}", mapReduceSpecification.getClassName(), e2);
            throw Throwables.propagate(e2);
        }
    }

    @Nullable
    private File getPluginArchive(ProgramOptions programOptions) {
        if (programOptions.getArguments().hasOption(ProgramOptionConstants.PLUGIN_ARCHIVE)) {
            return new File(programOptions.getArguments().getOption(ProgramOptionConstants.PLUGIN_ARCHIVE));
        }
        return null;
    }
}
