package co.cask.cdap.internal.app.runtime.workflow;

import co.cask.cdap.api.workflow.WorkflowAction;
import co.cask.cdap.api.workflow.WorkflowActionSpecification;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.app.runtime.workflow.WorkflowStatus;
import co.cask.cdap.common.lang.InstantiatorFactory;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.batch.MapReduceProgramRunner;
import co.cask.http.NettyHttpService;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Iterator;
import java.util.Map;
import org.apache.twill.api.RunId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/WorkflowDriver.class */
public final class WorkflowDriver extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(WorkflowDriver.class);
    private final Program program;
    private final RunId runId;
    private final InetAddress hostname;
    private final Map<String, String> runtimeArgs;
    private final WorkflowSpecification workflowSpec;
    private final long logicalStartTime;
    private final MapReduceRunnerFactory runnerFactory;
    private NettyHttpService httpService;
    private volatile boolean running;
    private volatile WorkflowStatus workflowStatus;

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkflowDriver(Program program, RunId runId, ProgramOptions programOptions, InetAddress inetAddress, WorkflowSpecification workflowSpecification, MapReduceProgramRunner mapReduceProgramRunner) {
        this.program = program;
        this.runId = runId;
        this.hostname = inetAddress;
        this.runtimeArgs = createRuntimeArgs(programOptions.getUserArguments());
        this.workflowSpec = workflowSpecification;
        this.logicalStartTime = programOptions.getArguments().hasOption(ProgramOptionConstants.LOGICAL_START_TIME) ? Long.parseLong(programOptions.getArguments().getOption(ProgramOptionConstants.LOGICAL_START_TIME)) : System.currentTimeMillis();
        this.runnerFactory = new WorkflowMapReduceRunnerFactory(workflowSpecification, mapReduceProgramRunner, program, runId, programOptions.getUserArguments(), this.logicalStartTime);
    }

    protected void startUp() throws Exception {
        LOG.info("Starting Workflow {}", this.workflowSpec);
        this.httpService = NettyHttpService.builder().setWorkerThreadPoolSize(2).setExecThreadPoolSize(4).setHost(this.hostname.getHostName()).addHttpHandlers(ImmutableList.of(new WorkflowServiceHandler(createStatusSupplier()))).build();
        this.httpService.startAndWait();
        this.running = true;
    }

    protected void shutDown() throws Exception {
        this.httpService.stopAndWait();
    }

    protected void run() throws Exception {
        LOG.info("Start workflow execution for {}", this.workflowSpec);
        InstantiatorFactory instantiatorFactory = new InstantiatorFactory(false);
        ClassLoader classLoader = this.program.getClassLoader();
        Iterator it = this.workflowSpec.getActions().iterator();
        int i = 0;
        while (this.running && it.hasNext()) {
            WorkflowActionSpecification workflowActionSpecification = (WorkflowActionSpecification) it.next();
            int i2 = i;
            i++;
            this.workflowStatus = new WorkflowStatus(state(), workflowActionSpecification, i2);
            WorkflowAction initialize = initialize(workflowActionSpecification, classLoader, instantiatorFactory);
            try {
                try {
                    initialize.run();
                    destroy(workflowActionSpecification, initialize);
                } catch (Throwable th) {
                    LOG.warn("Exception on WorkflowAction.run(), aborting Workflow. {}", workflowActionSpecification);
                    Throwables.propagateIfPossible(th, Exception.class);
                    destroy(workflowActionSpecification, initialize);
                }
            } catch (Throwable th2) {
                destroy(workflowActionSpecification, initialize);
                throw th2;
            }
        }
        if (it.hasNext()) {
            LOG.warn("Workflow explicitly stopped. Treated as abort on error. {} {}", this.workflowSpec);
            throw new IllegalStateException("Workflow stopped without executing all tasks: " + this.workflowSpec);
        }
        LOG.info("Workflow execution succeeded for {}", this.workflowSpec);
        this.running = false;
    }

    protected void triggerShutdown() {
        this.running = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InetSocketAddress getServiceEndpoint() {
        Preconditions.checkState(this.httpService != null && this.httpService.isRunning(), "Workflow service is not started.");
        return this.httpService.getBindAddress();
    }

    private WorkflowAction initialize(WorkflowActionSpecification workflowActionSpecification, ClassLoader classLoader, InstantiatorFactory instantiatorFactory) throws Exception {
        Class<?> cls = Class.forName(workflowActionSpecification.getClassName(), true, classLoader);
        Preconditions.checkArgument(WorkflowAction.class.isAssignableFrom(cls), "%s is not a WorkflowAction.", new Object[]{cls});
        WorkflowAction workflowAction = (WorkflowAction) instantiatorFactory.get(TypeToken.of(cls)).create();
        try {
            workflowAction.initialize(new BasicWorkflowContext(this.workflowSpec, workflowActionSpecification, this.logicalStartTime, this.runnerFactory, this.runtimeArgs));
        } catch (Throwable th) {
            LOG.warn("Exception on WorkflowAction.initialize(), abort Workflow. {}", workflowActionSpecification, th);
            Throwables.propagateIfPossible(th, Exception.class);
        }
        return workflowAction;
    }

    private void destroy(WorkflowActionSpecification workflowActionSpecification, WorkflowAction workflowAction) {
        try {
            workflowAction.destroy();
        } catch (Throwable th) {
            LOG.warn("Exception on WorkflowAction.destroy(): {}", workflowActionSpecification, th);
        }
    }

    private Map<String, String> createRuntimeArgs(Arguments arguments) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<Map.Entry<String, String>> it = arguments.iterator();
        while (it.hasNext()) {
            builder.put(it.next());
        }
        return builder.build();
    }

    private Supplier<WorkflowStatus> createStatusSupplier() {
        return new Supplier<WorkflowStatus>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowDriver.1
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public WorkflowStatus m141get() {
                return WorkflowDriver.this.workflowStatus;
            }
        };
    }
}
