package co.cask.cdap.internal.app.runtime.flow;

import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.internal.app.runtime.AbstractProgramController;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import com.google.common.base.Preconditions;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.Service;
import java.util.Collection;
import java.util.Iterator;
import org.apache.twill.common.ServiceListenerAdapter;
import org.apache.twill.common.Threads;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/flow/FlowletProgramController.class */
public final class FlowletProgramController extends AbstractProgramController {
    private static final Logger LOG = LoggerFactory.getLogger(FlowletProgramController.class);
    private final BasicFlowletContext flowletContext;
    private final FlowletProcessDriver driver;
    private final Collection<ConsumerSupplier<?>> consumerSuppliers;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FlowletProgramController(String str, String str2, BasicFlowletContext basicFlowletContext, FlowletProcessDriver flowletProcessDriver, Collection<ConsumerSupplier<?>> collection) {
        super(str + ":" + str2, basicFlowletContext.getRunId());
        this.flowletContext = basicFlowletContext;
        this.driver = flowletProcessDriver;
        this.consumerSuppliers = collection;
        listenDriveState(flowletProcessDriver);
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
    protected void doSuspend() throws Exception {
        LOG.info("Suspending flowlet: " + this.flowletContext);
        this.driver.suspend();
        Iterator<ConsumerSupplier<?>> it = this.consumerSuppliers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        LOG.info("Flowlet suspended: " + this.flowletContext);
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
    protected void doResume() throws Exception {
        LOG.info("Resuming flowlet: " + this.flowletContext);
        Iterator<ConsumerSupplier<?>> it = this.consumerSuppliers.iterator();
        while (it.hasNext()) {
            it.next().open(this.flowletContext.getInstanceCount());
        }
        this.driver.resume();
        LOG.info("Flowlet resumed: " + this.flowletContext);
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
    protected void doStop() throws Exception {
        LOG.info("Stopping flowlet: " + this.flowletContext);
        try {
            this.driver.stopAndWait();
            Iterator<ConsumerSupplier<?>> it = this.consumerSuppliers.iterator();
            while (it.hasNext()) {
                Closeables.closeQuietly(it.next());
            }
            this.flowletContext.close();
            LOG.info("Flowlet stopped: " + this.flowletContext);
        } catch (Throwable th) {
            Iterator<ConsumerSupplier<?>> it2 = this.consumerSuppliers.iterator();
            while (it2.hasNext()) {
                Closeables.closeQuietly(it2.next());
            }
            this.flowletContext.close();
            throw th;
        }
    }

    @Override // co.cask.cdap.internal.app.runtime.AbstractProgramController
    protected void doCommand(String str, Object obj) throws Exception {
        Preconditions.checkState(getState() == ProgramController.State.SUSPENDED, "Cannot change instance count when flowlet is running.");
        if (ProgramOptionConstants.INSTANCES.equals(str) && (obj instanceof Integer)) {
            int intValue = ((Integer) obj).intValue();
            LOG.info("Change flowlet instance count: " + this.flowletContext + ", new count is " + intValue);
            changeInstanceCount(this.flowletContext, intValue);
            LOG.info("Flowlet instance count changed: " + this.flowletContext + ", new count is " + intValue);
        }
    }

    private void changeInstanceCount(BasicFlowletContext basicFlowletContext, int i) {
        Preconditions.checkState(getState() == ProgramController.State.SUSPENDED, "Cannot change instance count of a flowlet without suspension.");
        basicFlowletContext.setInstanceCount(i);
    }

    private void listenDriveState(FlowletProcessDriver flowletProcessDriver) {
        flowletProcessDriver.addListener(new ServiceListenerAdapter() { // from class: co.cask.cdap.internal.app.runtime.flow.FlowletProgramController.1
            public void running() {
                FlowletProgramController.this.started();
            }

            public void failed(Service.State state, Throwable th) {
                FlowletProgramController.LOG.error("Flowlet terminated with exception", th);
                FlowletProgramController.this.error(th);
            }

            public void terminated(Service.State state) {
                if (FlowletProgramController.this.getState() != ProgramController.State.STOPPING) {
                    FlowletProgramController.LOG.warn("Flowlet terminated by itself");
                    Iterator it = FlowletProgramController.this.consumerSuppliers.iterator();
                    while (it.hasNext()) {
                        Closeables.closeQuietly((ConsumerSupplier) it.next());
                    }
                }
            }
        }, Threads.SAME_THREAD_EXECUTOR);
    }
}
