package co.cask.cdap.internal.app.runtime.workflow;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.security.store.SecureStore;
import co.cask.cdap.api.security.store.SecureStoreManager;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.app.store.RuntimeStore;
import co.cask.cdap.common.app.RunIds;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.metadata.writer.ProgramContextAware;
import co.cask.cdap.internal.app.runtime.AbstractListener;
import co.cask.cdap.internal.app.runtime.AbstractProgramRunnerWithPlugin;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.ProgramRunners;
import co.cask.cdap.internal.app.runtime.plugin.PluginInstantiator;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.proto.BasicThrowable;
import co.cask.cdap.proto.ProgramType;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.io.Closeables;
import com.google.inject.Inject;
import com.google.inject.name.Named;
import java.io.Closeable;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.tephra.TransactionSystemClient;
import org.apache.twill.api.RunId;
import org.apache.twill.api.ServiceAnnouncer;
import org.apache.twill.common.Threads;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/WorkflowProgramRunner.class */
public class WorkflowProgramRunner extends AbstractProgramRunnerWithPlugin {
    private static final Logger LOG = LoggerFactory.getLogger(WorkflowProgramRunner.class);
    private final ProgramRunnerFactory programRunnerFactory;
    private final ServiceAnnouncer serviceAnnouncer;
    private final InetAddress hostname;
    private final MetricsCollectionService metricsCollectionService;
    private final DatasetFramework datasetFramework;
    private final DiscoveryServiceClient discoveryServiceClient;
    private final TransactionSystemClient txClient;
    private final RuntimeStore runtimeStore;
    private final SecureStore secureStore;
    private final SecureStoreManager secureStoreManager;
    private final MessagingService messagingService;
    private final CConfiguration cConf;

    @Inject
    public WorkflowProgramRunner(ProgramRunnerFactory programRunnerFactory, ServiceAnnouncer serviceAnnouncer, @Named("master.services.bind.address") InetAddress inetAddress, MetricsCollectionService metricsCollectionService, DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient, TransactionSystemClient transactionSystemClient, RuntimeStore runtimeStore, CConfiguration cConfiguration, SecureStore secureStore, SecureStoreManager secureStoreManager, MessagingService messagingService) {
        super(cConfiguration);
        this.programRunnerFactory = programRunnerFactory;
        this.serviceAnnouncer = serviceAnnouncer;
        this.hostname = inetAddress;
        this.metricsCollectionService = metricsCollectionService;
        this.datasetFramework = datasetFramework;
        this.discoveryServiceClient = discoveryServiceClient;
        this.txClient = transactionSystemClient;
        this.runtimeStore = runtimeStore;
        this.secureStore = secureStore;
        this.secureStoreManager = secureStoreManager;
        this.messagingService = messagingService;
        this.cConf = cConfiguration;
    }

    @Override // co.cask.cdap.app.runtime.ProgramRunner
    public ProgramController run(final Program program, final ProgramOptions programOptions) {
        ApplicationSpecification applicationSpecification = program.getApplicationSpecification();
        Preconditions.checkNotNull(applicationSpecification, "Missing application specification.");
        ProgramType type = program.getType();
        Preconditions.checkNotNull(type, "Missing processor type.");
        Preconditions.checkArgument(type == ProgramType.WORKFLOW, "Only WORKFLOW process type is supported.");
        WorkflowSpecification workflowSpecification = (WorkflowSpecification) applicationSpecification.getWorkflows().get(program.getName());
        Preconditions.checkNotNull(workflowSpecification, "Missing WorkflowSpecification for %s", new Object[]{program.getName()});
        final RunId runId = ProgramRunners.getRunId(programOptions);
        if (this.datasetFramework instanceof ProgramContextAware) {
            this.datasetFramework.initContext(program.getId().run(runId));
        }
        final ArrayList arrayList = new ArrayList();
        try {
            PluginInstantiator createPluginInstantiator = createPluginInstantiator(programOptions, program.getClassLoader());
            if (createPluginInstantiator != null) {
                arrayList.add(createPluginInstantiator);
            }
            WorkflowDriver workflowDriver = new WorkflowDriver(program, programOptions, this.hostname, workflowSpecification, this.programRunnerFactory, this.metricsCollectionService, this.datasetFramework, this.discoveryServiceClient, this.txClient, this.runtimeStore, this.cConf, createPluginInstantiator, this.secureStore, this.secureStoreManager, this.messagingService);
            final WorkflowProgramController workflowProgramController = new WorkflowProgramController(program, workflowDriver, this.serviceAnnouncer, runId);
            final String option = programOptions.getArguments().getOption(ProgramOptionConstants.TWILL_RUN_ID);
            workflowProgramController.addListener(new AbstractListener() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramRunner.1
                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void init(ProgramController.State state, @Nullable Throwable th) {
                    long time = RunIds.getTime(workflowProgramController.getRunId(), TimeUnit.SECONDS);
                    if (time == -1) {
                        time = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis());
                    }
                    final long j = time;
                    Retries.supplyWithRetries(new Supplier<Void>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramRunner.1.1
                        /* renamed from: get, reason: merged with bridge method [inline-methods] */
                        public Void m238get() {
                            WorkflowProgramRunner.this.runtimeStore.setStart(program.getId(), runId.getId(), j, option, programOptions.getUserArguments().asMap(), programOptions.getArguments().asMap());
                            return null;
                        }
                    }, RetryStrategies.fixDelay(5L, TimeUnit.SECONDS));
                    if (state == ProgramController.State.COMPLETED) {
                        completed();
                    }
                    if (state == ProgramController.State.ERROR) {
                        error(workflowProgramController.getFailureCause());
                    }
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void completed() {
                    WorkflowProgramRunner.LOG.debug("Program {} with run id {} completed successfully.", program.getId(), runId.getId());
                    Retries.supplyWithRetries(new Supplier<Void>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramRunner.1.2
                        /* renamed from: get, reason: merged with bridge method [inline-methods] */
                        public Void m239get() {
                            WorkflowProgramRunner.this.runtimeStore.setStop(program.getId(), runId.getId(), TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramController.State.COMPLETED.getRunStatus());
                            return null;
                        }
                    }, RetryStrategies.fixDelay(5L, TimeUnit.SECONDS));
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void killed() {
                    WorkflowProgramRunner.LOG.debug("Program {} with run id {} killed.", program.getId(), runId.getId());
                    Retries.supplyWithRetries(new Supplier<Void>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramRunner.1.3
                        /* renamed from: get, reason: merged with bridge method [inline-methods] */
                        public Void m240get() {
                            WorkflowProgramRunner.this.runtimeStore.setStop(program.getId(), runId.getId(), TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramController.State.KILLED.getRunStatus());
                            return null;
                        }
                    }, RetryStrategies.fixDelay(5L, TimeUnit.SECONDS));
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void suspended() {
                    WorkflowProgramRunner.LOG.debug("Suspending Program {} with run id {}.", program.getId(), runId.getId());
                    Retries.supplyWithRetries(new Supplier<Void>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramRunner.1.4
                        /* renamed from: get, reason: merged with bridge method [inline-methods] */
                        public Void m241get() {
                            WorkflowProgramRunner.this.runtimeStore.setSuspend(program.getId(), runId.getId());
                            return null;
                        }
                    }, RetryStrategies.fixDelay(5L, TimeUnit.SECONDS));
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void resuming() {
                    WorkflowProgramRunner.LOG.debug("Resuming Program {} {}.", program.getId(), runId.getId());
                    Retries.supplyWithRetries(new Supplier<Void>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramRunner.1.5
                        /* renamed from: get, reason: merged with bridge method [inline-methods] */
                        public Void m242get() {
                            WorkflowProgramRunner.this.runtimeStore.setResume(program.getId(), runId.getId());
                            return null;
                        }
                    }, RetryStrategies.fixDelay(5L, TimeUnit.SECONDS));
                }

                @Override // co.cask.cdap.internal.app.runtime.AbstractListener, co.cask.cdap.app.runtime.ProgramController.Listener
                public void error(final Throwable th) {
                    WorkflowProgramRunner.LOG.info("Program {} with run id {} stopped because of error {}.", new Object[]{program.getId(), runId.getId(), th});
                    WorkflowProgramRunner.this.closeAllQuietly(arrayList);
                    Retries.supplyWithRetries(new Supplier<Void>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowProgramRunner.1.6
                        /* renamed from: get, reason: merged with bridge method [inline-methods] */
                        public Void m243get() {
                            WorkflowProgramRunner.this.runtimeStore.setStop(program.getId(), runId.getId(), TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis()), ProgramController.State.ERROR.getRunStatus(), new BasicThrowable(th));
                            return null;
                        }
                    }, RetryStrategies.fixDelay(5L, TimeUnit.SECONDS));
                }
            }, Threads.SAME_THREAD_EXECUTOR);
            workflowDriver.start();
            return workflowProgramController;
        } catch (Exception e) {
            closeAllQuietly(arrayList);
            throw Throwables.propagate(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void closeAllQuietly(Iterable<Closeable> iterable) {
        Iterator<Closeable> it = iterable.iterator();
        while (it.hasNext()) {
            Closeables.closeQuietly(it.next());
        }
    }
}
