package co.cask.cdap.internal.app.deploy.pipeline;

import co.cask.cdap.api.ProgramSpecification;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletConnection;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.discovery.RandomEndpointStrategy;
import co.cask.cdap.common.discovery.TimeLimitEndpointStrategy;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.stream.StreamConsumerFactory;
import co.cask.cdap.internal.app.deploy.ProgramTerminator;
import co.cask.cdap.internal.app.runtime.flow.FlowUtils;
import co.cask.cdap.pipeline.AbstractStage;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.ProgramTypes;
import com.google.common.base.Throwables;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.reflect.TypeToken;
import com.ning.http.client.SimpleAsyncHttpClient;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/deploy/pipeline/DeletedProgramHandlerStage.class */
public class DeletedProgramHandlerStage extends AbstractStage<ApplicationDeployable> {
    private static final long DISCOVERY_TIMEOUT_SECONDS = 3;
    private final Store store;
    private final ProgramTerminator programTerminator;
    private final StreamConsumerFactory streamConsumerFactory;
    private final QueueAdmin queueAdmin;
    private final DiscoveryServiceClient discoveryServiceClient;
    private static final Logger LOG = LoggerFactory.getLogger(DeletedProgramHandlerStage.class);
    private static final long METRICS_SERVER_RESPONSE_TIMEOUT = TimeUnit.MILLISECONDS.convert(5, TimeUnit.MINUTES);

    public DeletedProgramHandlerStage(Store store, ProgramTerminator programTerminator, StreamConsumerFactory streamConsumerFactory, QueueAdmin queueAdmin, DiscoveryServiceClient discoveryServiceClient) {
        super(TypeToken.of(ApplicationDeployable.class));
        this.store = store;
        this.programTerminator = programTerminator;
        this.streamConsumerFactory = streamConsumerFactory;
        this.queueAdmin = queueAdmin;
        this.discoveryServiceClient = discoveryServiceClient;
    }

    @Override // co.cask.cdap.pipeline.AbstractStage
    public void process(ApplicationDeployable applicationDeployable) throws Exception {
        List<ProgramSpecification> deletedProgramSpecifications = this.store.getDeletedProgramSpecifications(applicationDeployable.getId(), applicationDeployable.getSpecification());
        ArrayList newArrayList = Lists.newArrayList();
        Iterator<ProgramSpecification> it = deletedProgramSpecifications.iterator();
        while (it.hasNext()) {
            FlowSpecification flowSpecification = (ProgramSpecification) it.next();
            ProgramType fromSpecification = ProgramTypes.fromSpecification(flowSpecification);
            Id.Program from = Id.Program.from(applicationDeployable.getId(), flowSpecification.getName());
            this.programTerminator.stop(Id.Namespace.from(applicationDeployable.getId().getNamespaceId()), from, fromSpecification);
            if (ProgramType.FLOW.equals(fromSpecification)) {
                FlowSpecification flowSpecification2 = flowSpecification;
                HashMultimap create = HashMultimap.create();
                for (FlowletConnection flowletConnection : flowSpecification2.getConnections()) {
                    if (flowletConnection.getSourceType() == FlowletConnection.Type.STREAM) {
                        create.put(flowletConnection.getSourceName(), Long.valueOf(FlowUtils.generateConsumerGroupId(from, flowletConnection.getTargetName())));
                    }
                }
                String format = String.format("%s.%s", from.getApplicationId(), from.getId());
                for (Map.Entry entry : create.asMap().entrySet()) {
                    this.streamConsumerFactory.dropAll(QueueName.fromStream((String) entry.getKey()), format, (Iterable) entry.getValue());
                }
                this.queueAdmin.dropAllForFlow(from.getApplicationId(), from.getId());
                newArrayList.add(from.getId());
            }
        }
        if (!newArrayList.isEmpty()) {
            deleteMetrics(applicationDeployable.getId().getNamespaceId(), applicationDeployable.getId().getId(), newArrayList);
        }
        emit(applicationDeployable);
    }

    private void deleteMetrics(String str, String str2, Iterable<String> iterable) throws IOException {
        Discoverable pick = new TimeLimitEndpointStrategy(new RandomEndpointStrategy(this.discoveryServiceClient.discover("metrics")), DISCOVERY_TIMEOUT_SECONDS, TimeUnit.SECONDS).pick();
        if (pick == null) {
            LOG.error("Fail to get any metrics endpoint for deleting metrics.");
            return;
        }
        LOG.debug("Deleting metrics for application {}", str2);
        Iterator<String> it = iterable.iterator();
        while (it.hasNext()) {
            SimpleAsyncHttpClient build = new SimpleAsyncHttpClient.Builder().setUrl(String.format("http://%s:%d%s/metrics/%s/apps/%s/flows/%s", pick.getSocketAddress().getHostName(), Integer.valueOf(pick.getSocketAddress().getPort()), "/v2", "ignored", str2, it.next())).setRequestTimeoutInMs((int) METRICS_SERVER_RESPONSE_TIMEOUT).build();
            try {
                try {
                    build.delete().get(METRICS_SERVER_RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS);
                    build.close();
                } catch (Exception e) {
                    LOG.error("exception making metrics delete call", e);
                    Throwables.propagate(e);
                    build.close();
                }
            } catch (Throwable th) {
                build.close();
                throw th;
            }
        }
    }
}
