package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletConnection;
import co.cask.cdap.api.flow.FlowletDefinition;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.runtime.flow.FlowUtils;
import co.cask.cdap.proto.id.ProgramId;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.tephra.TransactionExecutorFactory;
import org.apache.twill.api.TwillController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedFlowletInstanceUpdater.class */
final class DistributedFlowletInstanceUpdater {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedFlowletInstanceUpdater.class);
    private static final int MAX_WAIT_SECONDS = 30;
    private static final int SECONDS_PER_WAIT = 1;
    private final ProgramId programId;
    private final TwillController twillController;
    private final QueueAdmin queueAdmin;
    private final StreamAdmin streamAdmin;
    private final Multimap<String, QueueName> consumerQueues;
    private final TransactionExecutorFactory txExecutorFactory;

    /* JADX INFO: Access modifiers changed from: package-private */
    public DistributedFlowletInstanceUpdater(ProgramId programId, TwillController twillController, QueueAdmin queueAdmin, StreamAdmin streamAdmin, Multimap<String, QueueName> multimap, TransactionExecutorFactory transactionExecutorFactory) {
        this.programId = programId;
        this.twillController = twillController;
        this.queueAdmin = queueAdmin;
        this.streamAdmin = streamAdmin;
        this.consumerQueues = multimap;
        this.txExecutorFactory = transactionExecutorFactory;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void update(String str, int i, FlowSpecification flowSpecification) throws Exception {
        Set<String> set = (Set) getUpstreamFlowlets(flowSpecification, str, Sets.newHashSet());
        set.add(str);
        for (String str2 : set) {
            waitForInstances(str2, getInstances(flowSpecification, str2));
            this.twillController.sendCommand(str2, ProgramCommands.SUSPEND).get();
        }
        FlowUtils.reconfigure(this.consumerQueues.get(str), FlowUtils.generateConsumerGroupId(this.programId, str), i, this.streamAdmin, this.queueAdmin, this.txExecutorFactory);
        this.twillController.changeInstances(str, i).get();
        Iterator it = set.iterator();
        while (it.hasNext()) {
            this.twillController.sendCommand((String) it.next(), ProgramCommands.RESUME).get();
        }
    }

    private void waitForInstances(String str, int i) throws InterruptedException, TimeoutException {
        int numberOfProvisionedInstances = getNumberOfProvisionedInstances(str);
        int i2 = 0;
        while (numberOfProvisionedInstances != i) {
            LOG.debug("waiting for {} instances of {} before suspending flowlets", Integer.valueOf(i), str);
            TimeUnit.SECONDS.sleep(1L);
            i2 += SECONDS_PER_WAIT;
            if (i2 > MAX_WAIT_SECONDS) {
                String format = String.format("waited %d seconds for instances of %s to reach expected count of %d, but %d are running", Integer.valueOf(i2), str, Integer.valueOf(i), Integer.valueOf(numberOfProvisionedInstances));
                LOG.error(format);
                throw new TimeoutException(format);
            }
            numberOfProvisionedInstances = getNumberOfProvisionedInstances(str);
        }
    }

    private int getNumberOfProvisionedInstances(String str) {
        return this.twillController.getResourceReport().getRunnableResources(str).size();
    }

    private <T extends Collection<String>> T getUpstreamFlowlets(FlowSpecification flowSpecification, String str, T t) {
        for (FlowletConnection flowletConnection : flowSpecification.getConnections()) {
            if (flowletConnection.getTargetName().equals(str) && flowletConnection.getSourceType() == FlowletConnection.Type.FLOWLET) {
                t.add(flowletConnection.getSourceName());
            }
        }
        return t;
    }

    private int getInstances(FlowSpecification flowSpecification, String str) {
        return ((FlowletDefinition) flowSpecification.getFlowlets().get(str)).getInstances();
    }
}
