package co.cask.cdap.internal.app.runtime.workflow;

import co.cask.cdap.api.Predicate;
import co.cask.cdap.api.schedule.SchedulableProgramType;
import co.cask.cdap.api.workflow.ScheduleProgramInfo;
import co.cask.cdap.api.workflow.WorkflowAction;
import co.cask.cdap.api.workflow.WorkflowActionNode;
import co.cask.cdap.api.workflow.WorkflowActionSpecification;
import co.cask.cdap.api.workflow.WorkflowConditionNode;
import co.cask.cdap.api.workflow.WorkflowForkNode;
import co.cask.cdap.api.workflow.WorkflowNode;
import co.cask.cdap.api.workflow.WorkflowNodeType;
import co.cask.cdap.api.workflow.WorkflowSpecification;
import co.cask.cdap.api.workflow.WorkflowToken;
import co.cask.cdap.app.ApplicationSpecification;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.runtime.Arguments;
import co.cask.cdap.app.runtime.ProgramOptions;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.lang.InstantiatorFactory;
import co.cask.cdap.common.logging.LoggingContext;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.internal.app.runtime.spark.metrics.SparkMetricsSink;
import co.cask.cdap.internal.workflow.DefaultWorkflowActionSpecification;
import co.cask.cdap.internal.workflow.ProgramWorkflowAction;
import co.cask.cdap.logging.context.WorkflowLoggingContext;
import co.cask.cdap.templates.AdapterDefinition;
import co.cask.http.NettyHttpService;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.AbstractExecutionThreadService;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.gson.Gson;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/WorkflowDriver.class */
public final class WorkflowDriver extends AbstractExecutionThreadService {
    private static final Logger LOG = LoggerFactory.getLogger(WorkflowDriver.class);
    private static final Gson GSON = new Gson();
    private final Program program;
    private final InetAddress hostname;
    private final Map<String, String> runtimeArgs;
    private final WorkflowSpecification workflowSpec;
    private final long logicalStartTime;
    private final ProgramWorkflowRunnerFactory workflowProgramRunnerFactory;
    private final Map<String, WorkflowActionNode> status = new ConcurrentHashMap();
    private final LoggingContext loggingContext;
    private NettyHttpService httpService;
    private volatile Thread runningThread;
    private boolean suspended;
    private Lock lock;
    private Condition condition;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.internal.app.runtime.workflow.WorkflowDriver$4, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/workflow/WorkflowDriver$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$api$schedule$SchedulableProgramType;
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType = new int[WorkflowNodeType.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[WorkflowNodeType.ACTION.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[WorkflowNodeType.FORK.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[WorkflowNodeType.CONDITION.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            $SwitchMap$co$cask$cdap$api$schedule$SchedulableProgramType = new int[SchedulableProgramType.values().length];
            try {
                $SwitchMap$co$cask$cdap$api$schedule$SchedulableProgramType[SchedulableProgramType.MAPREDUCE.ordinal()] = 1;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$schedule$SchedulableProgramType[SchedulableProgramType.SPARK.ordinal()] = 2;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$co$cask$cdap$api$schedule$SchedulableProgramType[SchedulableProgramType.CUSTOM_ACTION.ordinal()] = 3;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public WorkflowDriver(Program program, ProgramOptions programOptions, InetAddress inetAddress, WorkflowSpecification workflowSpecification, ProgramRunnerFactory programRunnerFactory) {
        this.program = program;
        this.hostname = inetAddress;
        this.runtimeArgs = createRuntimeArgs(programOptions.getUserArguments());
        this.workflowSpec = workflowSpecification;
        Arguments arguments = programOptions.getArguments();
        this.logicalStartTime = arguments.hasOption(ProgramOptionConstants.LOGICAL_START_TIME) ? Long.parseLong(arguments.getOption(ProgramOptionConstants.LOGICAL_START_TIME)) : System.currentTimeMillis();
        this.workflowProgramRunnerFactory = new ProgramWorkflowRunnerFactory(workflowSpecification, programRunnerFactory, program, programOptions);
        this.lock = new ReentrantLock();
        this.condition = this.lock.newCondition();
        String option = arguments.getOption(ProgramOptionConstants.ADAPTER_SPEC);
        this.loggingContext = new WorkflowLoggingContext(program.getNamespaceId(), program.getApplicationId(), program.getName(), arguments.getOption(ProgramOptionConstants.RUN_ID), option != null ? ((AdapterDefinition) GSON.fromJson(option, AdapterDefinition.class)).getName() : null);
    }

    protected void startUp() throws Exception {
        LoggingContextAccessor.setLoggingContext(this.loggingContext);
        LOG.info("Starting Workflow {}", this.workflowSpec);
        this.httpService = NettyHttpService.builder().setWorkerThreadPoolSize(2).setExecThreadPoolSize(4).setHost(this.hostname.getHostName()).addHttpHandlers(ImmutableList.of(new WorkflowServiceHandler(createStatusSupplier()))).build();
        this.httpService.startAndWait();
        this.runningThread = Thread.currentThread();
    }

    private void blockIfSuspended() {
        this.lock.lock();
        while (this.suspended) {
            try {
                try {
                    this.condition.await();
                } catch (InterruptedException e) {
                    LOG.warn("Wait on the Condition is interrupted.");
                    Thread.currentThread().interrupt();
                    this.lock.unlock();
                    return;
                }
            } finally {
                this.lock.unlock();
            }
        }
    }

    public void suspend() throws Exception {
        LOG.info("Suspending the Workflow");
        this.lock.lock();
        try {
            this.suspended = true;
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    public void resume() throws Exception {
        LOG.info("Resuming the Workflow");
        this.lock.lock();
        try {
            this.suspended = false;
            this.condition.signalAll();
            this.lock.unlock();
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    protected void shutDown() throws Exception {
        this.httpService.stopAndWait();
    }

    private void executeAction(ApplicationSpecification applicationSpecification, WorkflowActionNode workflowActionNode, InstantiatorFactory instantiatorFactory, ClassLoader classLoader, WorkflowToken workflowToken) throws Exception {
        DefaultWorkflowActionSpecification actionSpecification;
        RuntimeException propagate;
        ScheduleProgramInfo program = workflowActionNode.getProgram();
        switch (AnonymousClass4.$SwitchMap$co$cask$cdap$api$schedule$SchedulableProgramType[program.getProgramType().ordinal()]) {
            case SparkMetricsSink.CONSOLE_DEFAULT_PERIOD /* 1 */:
                String name = applicationSpecification.getMapReduce().get(program.getProgramName()).getName();
                actionSpecification = new DefaultWorkflowActionSpecification(new ProgramWorkflowAction(name, name, SchedulableProgramType.MAPREDUCE));
                break;
            case 2:
                String name2 = applicationSpecification.getSpark().get(program.getProgramName()).getName();
                actionSpecification = new DefaultWorkflowActionSpecification(new ProgramWorkflowAction(name2, name2, SchedulableProgramType.SPARK));
                break;
            case 3:
                actionSpecification = workflowActionNode.getActionSpecification();
                break;
            default:
                LOG.error("Unknown Program Type '{}', Program '{}' in the Workflow.", program.getProgramType(), program.getProgramName());
                throw new IllegalStateException("Workflow stopped without executing all tasks");
        }
        this.status.put(workflowActionNode.getNodeId(), workflowActionNode);
        final WorkflowAction initialize = initialize(actionSpecification, classLoader, instantiatorFactory, workflowToken, workflowActionNode.getNodeId());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("workflow-executor-%d").build());
        try {
            try {
                final DefaultWorkflowActionSpecification defaultWorkflowActionSpecification = actionSpecification;
                newSingleThreadExecutor.submit(new Runnable() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowDriver.1
                    @Override // java.lang.Runnable
                    public void run() {
                        ClassLoaders.setContextClassLoader(initialize.getClass().getClassLoader());
                        try {
                            initialize.run();
                            WorkflowDriver.this.destroy(defaultWorkflowActionSpecification, initialize);
                        } catch (Throwable th) {
                            WorkflowDriver.this.destroy(defaultWorkflowActionSpecification, initialize);
                            throw th;
                        }
                    }
                }).get();
                newSingleThreadExecutor.shutdownNow();
                this.status.remove(workflowActionNode.getNodeId());
            } finally {
            }
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            this.status.remove(workflowActionNode.getNodeId());
            throw th;
        }
    }

    private void executeFork(final ApplicationSpecification applicationSpecification, WorkflowForkNode workflowForkNode, final InstantiatorFactory instantiatorFactory, final ClassLoader classLoader, final WorkflowToken workflowToken) throws Exception {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(workflowForkNode.getBranches().size(), new ThreadFactoryBuilder().setNameFormat("workflow-fork-executor-%d").build());
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newFixedThreadPool);
        try {
            for (final List list : workflowForkNode.getBranches()) {
                executorCompletionService.submit(new Callable<Map.Entry<String, WorkflowToken>>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowDriver.2
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Map.Entry<String, WorkflowToken> call() throws Exception {
                        WorkflowToken deepCopy = ((BasicWorkflowToken) workflowToken).deepCopy();
                        WorkflowDriver.this.executeAll(list.iterator(), applicationSpecification, instantiatorFactory, classLoader, deepCopy);
                        return Maps.immutableEntry(list.toString(), deepCopy);
                    }
                });
            }
            boolean z = false;
            for (int i = 0; i < workflowForkNode.getBranches().size(); i++) {
                try {
                    Map.Entry entry = (Map.Entry) executorCompletionService.take().get();
                    String str = (String) entry.getKey();
                    WorkflowToken workflowToken2 = (WorkflowToken) entry.getValue();
                    if (!z && workflowToken2.getMapReduceCounters() != null) {
                        ((BasicWorkflowToken) workflowToken).setMapReduceCounters(workflowToken2.getMapReduceCounters());
                        z = true;
                    }
                    LOG.info("Execution of branch {} for fork {} completed", str, workflowForkNode);
                } catch (Throwable th) {
                    Throwable rootCause = Throwables.getRootCause(th);
                    if (rootCause instanceof ExecutionException) {
                        LOG.error("Exception occurred in the execution of the fork node {}", workflowForkNode);
                        throw ((ExecutionException) th);
                    }
                    if (!(rootCause instanceof InterruptedException)) {
                        Throwables.propagateIfPossible(th, Exception.class);
                        throw Throwables.propagate(th);
                    }
                    LOG.error("Workflow execution aborted.");
                }
            }
        } finally {
            newFixedThreadPool.shutdownNow();
            newFixedThreadPool.awaitTermination(2147483647L, TimeUnit.NANOSECONDS);
        }
    }

    private void executeNode(ApplicationSpecification applicationSpecification, WorkflowNode workflowNode, InstantiatorFactory instantiatorFactory, ClassLoader classLoader, WorkflowToken workflowToken) throws Exception {
        switch (AnonymousClass4.$SwitchMap$co$cask$cdap$api$workflow$WorkflowNodeType[workflowNode.getType().ordinal()]) {
            case SparkMetricsSink.CONSOLE_DEFAULT_PERIOD /* 1 */:
                executeAction(applicationSpecification, (WorkflowActionNode) workflowNode, instantiatorFactory, classLoader, workflowToken);
                return;
            case 2:
                executeFork(applicationSpecification, (WorkflowForkNode) workflowNode, instantiatorFactory, classLoader, workflowToken);
                return;
            case 3:
                executeCondition(applicationSpecification, (WorkflowConditionNode) workflowNode, instantiatorFactory, classLoader, workflowToken);
                return;
            default:
                return;
        }
    }

    private void executeCondition(ApplicationSpecification applicationSpecification, WorkflowConditionNode workflowConditionNode, InstantiatorFactory instantiatorFactory, ClassLoader classLoader, WorkflowToken workflowToken) throws Exception {
        executeAll(((Predicate) instantiatorFactory.get(TypeToken.of(classLoader.loadClass(workflowConditionNode.getPredicateClassName()))).create()).apply(new BasicWorkflowContext(this.workflowSpec, null, this.logicalStartTime, null, this.runtimeArgs, workflowToken)) ? workflowConditionNode.getIfBranch().iterator() : workflowConditionNode.getElseBranch().iterator(), applicationSpecification, instantiatorFactory, classLoader, workflowToken);
    }

    protected void run() throws Exception {
        LOG.info("Start workflow execution for {}", this.workflowSpec);
        executeAll(this.workflowSpec.getNodes().iterator(), this.program.getApplicationSpecification(), new InstantiatorFactory(false), this.program.getClassLoader(), new BasicWorkflowToken());
        LOG.info("Workflow execution succeeded for {}", this.workflowSpec);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void executeAll(Iterator<WorkflowNode> it, ApplicationSpecification applicationSpecification, InstantiatorFactory instantiatorFactory, ClassLoader classLoader, WorkflowToken workflowToken) {
        while (it.hasNext() && this.runningThread != null) {
            try {
                blockIfSuspended();
                executeNode(applicationSpecification, it.next(), instantiatorFactory, classLoader, workflowToken);
            } catch (Throwable th) {
                if (Throwables.getRootCause(th) instanceof InterruptedException) {
                    LOG.error("Workflow execution aborted.");
                    return;
                }
                Throwables.propagate(th);
            }
        }
    }

    protected void triggerShutdown() {
        Thread thread = this.runningThread;
        this.runningThread = null;
        thread.interrupt();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public InetSocketAddress getServiceEndpoint() {
        Preconditions.checkState(this.httpService != null && this.httpService.isRunning(), "Workflow service is not started.");
        return this.httpService.getBindAddress();
    }

    private WorkflowAction initialize(WorkflowActionSpecification workflowActionSpecification, ClassLoader classLoader, InstantiatorFactory instantiatorFactory, WorkflowToken workflowToken, String str) throws Exception {
        Class<?> cls = Class.forName(workflowActionSpecification.getClassName(), true, classLoader);
        Preconditions.checkArgument(WorkflowAction.class.isAssignableFrom(cls), "%s is not a WorkflowAction.", new Object[]{cls});
        WorkflowAction workflowAction = (WorkflowAction) instantiatorFactory.get(TypeToken.of(cls)).create();
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(workflowAction.getClass().getClassLoader());
        try {
            try {
                workflowAction.initialize(new BasicWorkflowContext(this.workflowSpec, workflowActionSpecification, this.logicalStartTime, this.workflowProgramRunnerFactory.getProgramWorkflowRunner(workflowActionSpecification, workflowToken, str), this.runtimeArgs, workflowToken));
                ClassLoaders.setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                LOG.warn("Exception on WorkflowAction.initialize(), abort Workflow. {}", workflowActionSpecification, th);
                Throwables.propagateIfPossible(th, Exception.class);
                ClassLoaders.setContextClassLoader(contextClassLoader);
            }
            return workflowAction;
        } catch (Throwable th2) {
            ClassLoaders.setContextClassLoader(contextClassLoader);
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void destroy(WorkflowActionSpecification workflowActionSpecification, WorkflowAction workflowAction) {
        ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(workflowAction.getClass().getClassLoader());
        try {
            try {
                workflowAction.destroy();
                ClassLoaders.setContextClassLoader(contextClassLoader);
            } catch (Throwable th) {
                LOG.warn("Exception on WorkflowAction.destroy(): {}", workflowActionSpecification, th);
                ClassLoaders.setContextClassLoader(contextClassLoader);
            }
        } catch (Throwable th2) {
            ClassLoaders.setContextClassLoader(contextClassLoader);
            throw th2;
        }
    }

    private Map<String, String> createRuntimeArgs(Arguments arguments) {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Iterator<Map.Entry<String, String>> it = arguments.iterator();
        while (it.hasNext()) {
            builder.put(it.next());
        }
        return builder.build();
    }

    private Supplier<List<WorkflowActionNode>> createStatusSupplier() {
        return new Supplier<List<WorkflowActionNode>>() { // from class: co.cask.cdap.internal.app.runtime.workflow.WorkflowDriver.3
            /* renamed from: get, reason: merged with bridge method [inline-methods] */
            public List<WorkflowActionNode> m168get() {
                ArrayList newArrayList = Lists.newArrayList();
                Iterator it = WorkflowDriver.this.status.entrySet().iterator();
                while (it.hasNext()) {
                    newArrayList.add(((Map.Entry) it.next()).getValue());
                }
                return newArrayList;
            }
        };
    }
}
