package co.cask.cdap.internal.app.runtime.flow;

import co.cask.cdap.api.flow.flowlet.Callback;
import co.cask.cdap.api.flow.flowlet.Flowlet;
import co.cask.cdap.common.lang.ClassLoaders;
import co.cask.cdap.common.logging.LoggingContextAccessor;
import co.cask.cdap.internal.app.runtime.DataFabricFacade;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionFailureException;
import com.google.common.base.Throwables;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Service;
import java.util.Collection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowletRuntimeService.class */
final class FlowletRuntimeService extends AbstractIdleService {
    private static final Logger LOG = LoggerFactory.getLogger(FlowletRuntimeService.class);
    private final Flowlet flowlet;
    private final BasicFlowletContext flowletContext;
    private final Collection<? extends ProcessSpecification<?>> processSpecs;
    private final Callback txCallback;
    private final DataFabricFacade dataFabricFacade;
    private final Service serviceHook;
    private FlowletProcessDriver flowletProcessDriver;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlowletRuntimeService(Flowlet flowlet, BasicFlowletContext basicFlowletContext, Collection<? extends ProcessSpecification<?>> collection, Callback callback, DataFabricFacade dataFabricFacade, Service service) {
        this.flowlet = flowlet;
        this.flowletContext = basicFlowletContext;
        this.processSpecs = collection;
        this.txCallback = callback;
        this.dataFabricFacade = dataFabricFacade;
        this.serviceHook = service;
    }

    protected void startUp() throws Exception {
        LoggingContextAccessor.setLoggingContext(this.flowletContext.getLoggingContext());
        this.flowletContext.getProgramMetrics().increment("process.instance", 1L);
        this.flowletProcessDriver = new FlowletProcessDriver(this.flowletContext, this.dataFabricFacade, this.txCallback, this.processSpecs);
        this.serviceHook.startAndWait();
        initFlowlet();
        this.flowletProcessDriver.startAndWait();
    }

    protected void shutDown() throws Exception {
        LoggingContextAccessor.setLoggingContext(this.flowletContext.getLoggingContext());
        if (this.flowletProcessDriver != null) {
            stopService(this.flowletProcessDriver);
        }
        destroyFlowlet();
        stopService(this.serviceHook);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void suspend() {
        this.flowletProcessDriver.stopAndWait();
        this.flowletProcessDriver = new FlowletProcessDriver(this.flowletProcessDriver);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void resume() {
        this.flowletProcessDriver.startAndWait();
    }

    private void initFlowlet() throws InterruptedException {
        try {
            this.dataFabricFacade.createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletRuntimeService.1
                public void apply() throws Exception {
                    FlowletRuntimeService.LOG.info("Initializing flowlet: " + FlowletRuntimeService.this.flowletContext);
                    ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(FlowletRuntimeService.this.flowletContext.getProgram().getClassLoader());
                    try {
                        FlowletRuntimeService.this.flowlet.initialize(FlowletRuntimeService.this.flowletContext);
                        ClassLoaders.setContextClassLoader(contextClassLoader);
                        FlowletRuntimeService.LOG.info("Flowlet initialized: " + FlowletRuntimeService.this.flowletContext);
                    } catch (Throwable th) {
                        ClassLoaders.setContextClassLoader(contextClassLoader);
                        throw th;
                    }
                }
            });
        } catch (TransactionFailureException e) {
            Throwable cause = e.getCause() == null ? e : e.getCause();
            LOG.error("Flowlet throws exception during flowlet initialize: " + this.flowletContext, cause);
            throw Throwables.propagate(cause);
        }
    }

    private void destroyFlowlet() {
        try {
            this.dataFabricFacade.createTransactionExecutor().execute(new TransactionExecutor.Subroutine() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletRuntimeService.2
                public void apply() throws Exception {
                    FlowletRuntimeService.LOG.info("Destroying flowlet: " + FlowletRuntimeService.this.flowletContext);
                    ClassLoader contextClassLoader = ClassLoaders.setContextClassLoader(FlowletRuntimeService.this.flowletContext.getProgram().getClassLoader());
                    try {
                        FlowletRuntimeService.this.flowlet.destroy();
                        ClassLoaders.setContextClassLoader(contextClassLoader);
                        FlowletRuntimeService.LOG.info("Flowlet destroyed: " + FlowletRuntimeService.this.flowletContext);
                    } catch (Throwable th) {
                        ClassLoaders.setContextClassLoader(contextClassLoader);
                        throw th;
                    }
                }
            });
        } catch (TransactionFailureException e) {
            LOG.error("Flowlet throws exception during flowlet destroy: " + this.flowletContext, e.getCause() == null ? e : e.getCause());
        } catch (InterruptedException e2) {
        }
    }

    private void stopService(Service service) {
        try {
            service.stopAndWait();
        } catch (Throwable th) {
            LOG.warn("Exception when stopping service {}", service);
        }
    }
}
