package co.cask.cdap.internal.app.runtime.worker;

import co.cask.cdap.api.Resources;
import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.security.store.SecureStore;
import co.cask.cdap.api.security.store.SecureStoreManager;
import co.cask.cdap.api.worker.WorkerSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.stream.StreamWriterFactory;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.writer.ProgramContextAware;
import co.cask.cdap.internal.app.runtime.AbstractProgramRunnerWithPlugin;
import co.cask.cdap.internal.app.runtime.ProgramControllerServiceAdapter;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.plugin.PluginInstantiator;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Service;
import com.google.gson.Gson;
import com.google.inject.Inject;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.api.RunId;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.ServiceListenerAdapter;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/worker/WorkerProgramRunner.class */
public class WorkerProgramRunner extends AbstractProgramRunnerWithPlugin {
    private static final Gson GSON = new Gson();
    private final MetricsCollectionService metricsCollectionService;
    private final DatasetFramework datasetFramework;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final TransactionSystemClient txClient;
    private final StreamWriterFactory streamWriterFactory;
    private final SecureStore secureStore;
    private final SecureStoreManager secureStoreManager;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/worker/WorkerProgramRunner$WorkerControllerServiceAdapter.class */
    private static final class WorkerControllerServiceAdapter extends ProgramControllerServiceAdapter {
        private final WorkerDriver workerDriver;

        WorkerControllerServiceAdapter(WorkerDriver workerDriver, Id.Program program, RunId runId, String str) {
            super(workerDriver, program, runId, str);
            this.workerDriver = workerDriver;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // co.cask.cdap.internal.app.runtime.ProgramControllerServiceAdapter, co.cask.cdap.internal.app.runtime.AbstractProgramController
        public void doCommand(String str, Object obj) throws Exception {
            super.doCommand(str, obj);
            if (ProgramOptionConstants.INSTANCES.equals(str) && (obj instanceof Integer)) {
                this.workerDriver.setInstanceCount(((Integer) obj).intValue());
            }
        }
    }

    @Inject
    public WorkerProgramRunner(CConfiguration cConfiguration, MetricsCollectionService metricsCollectionService, DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient, TransactionSystemClient transactionSystemClient, StreamWriterFactory streamWriterFactory, SecureStore secureStore, SecureStoreManager secureStoreManager) {
        super(cConfiguration);
        this.metricsCollectionService = metricsCollectionService;
        this.datasetFramework = datasetFramework;
        this.discoveryServiceClient = discoveryServiceClient;
        this.txClient = transactionSystemClient;
        this.streamWriterFactory = streamWriterFactory;
        this.secureStore = secureStore;
        this.secureStoreManager = secureStoreManager;
    }

    @Override // co.cask.cdap.app.runtime.ProgramRunner
    public ProgramController run(Program program, ProgramOptions programOptions) {
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
        int parseInt = Integer.parseInt(programOptions.getArguments().getOption(ProgramOptionConstants.INSTANCE_ID, "-1"));
        Preconditions.checkArgument(parseInt >= 0, "Missing instance Id");
        int parseInt2 = Integer.parseInt(programOptions.getArguments().getOption(ProgramOptionConstants.INSTANCES, "0"));
        Preconditions.checkArgument(parseInt2 > 0, "Invalid or missing instance count");
        RunId runId = ProgramRunners.getRunId(programOptions);
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.WORKER, "Only Worker process type is supported.");
        WorkerSpecification workerSpecification = (WorkerSpecification) applicationSpecification.getWorkers().get(program.getName());
        Preconditions.checkArgument(workerSpecification != null, "Missing Worker specification for %s", new Object[]{program.getId()});
        String option = programOptions.getArguments().getOption(ProgramOptionConstants.INSTANCES, String.valueOf(workerSpecification.getInstances()));
        String option2 = programOptions.getArguments().getOption(ProgramOptionConstants.RESOURCES, null);
        WorkerSpecification workerSpecification2 = new WorkerSpecification(workerSpecification.getClassName(), workerSpecification.getName(), workerSpecification.getDescription(), workerSpecification.getProperties(), workerSpecification.getDatasets(), option2 != null ? (Resources) GSON.fromJson(option2, Resources.class) : workerSpecification.getResources(), Integer.valueOf(option).intValue());
        if (this.datasetFramework instanceof ProgramContextAware) {
            this.datasetFramework.initContext(new Id.Run(program.getId(), runId.getId()));
        }
        final PluginInstantiator createPluginInstantiator = createPluginInstantiator(programOptions, program.getClassLoader());
        try {
            WorkerDriver workerDriver = new WorkerDriver(program, workerSpecification2, new BasicWorkerContext(workerSpecification2, program, programOptions, parseInt, parseInt2, this.metricsCollectionService, this.datasetFramework, this.txClient, this.discoveryServiceClient, this.streamWriterFactory, createPluginInstantiator, this.secureStore, this.secureStoreManager));
            workerDriver.addListener(new ServiceListenerAdapter() { // from class: co.cask.cdap.internal.app.runtime.worker.WorkerProgramRunner.1
                public void terminated(Service.State state) {
                    Closeables.closeQuietly(createPluginInstantiator);
                }

                public void failed(Service.State state, Throwable th) {
                    Closeables.closeQuietly(createPluginInstantiator);
                }
            }, Threads.SAME_THREAD_EXECUTOR);
            WorkerControllerServiceAdapter workerControllerServiceAdapter = new WorkerControllerServiceAdapter(workerDriver, program.getId(), runId, workerSpecification.getName() + "-" + parseInt);
            workerDriver.start();
            return workerControllerServiceAdapter;
        } catch (Throwable th) {
            Closeables.closeQuietly(createPluginInstantiator);
            throw th;
        }
    }
}
