package co.cask.cdap.data.tools.flow;

import co.cask.cdap.api.dataset.lib.cube.AggregationFunction;
import co.cask.cdap.api.dataset.lib.cube.Interpolator;
import co.cask.cdap.api.dataset.lib.cube.TimeValue;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletConnection;
import co.cask.cdap.api.metrics.MetricDataQuery;
import co.cask.cdap.api.metrics.MetricStore;
import co.cask.cdap.api.metrics.MetricTimeSeries;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.app.ApplicationSpecification;
import co.cask.cdap.app.guice.AppFabricServiceRuntimeModule;
import co.cask.cdap.app.guice.ProgramRunnerRuntimeModule;
import co.cask.cdap.app.guice.ServiceStoreModules;
import co.cask.cdap.app.queue.QueueSpecification;
import co.cask.cdap.app.queue.QueueSpecificationGenerator;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.exception.NotFoundException;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.TwillModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data.runtime.DataFabricDistributedModule;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.runtime.SystemDatasetRuntimeModule;
import co.cask.cdap.data.stream.StreamAdminModules;
import co.cask.cdap.data.tools.HBaseQueueDebugger;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueAdmin;
import co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueClientFactory;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.explore.guice.ExploreClientModule;
import co.cask.cdap.gateway.auth.AuthModule;
import co.cask.cdap.internal.app.namespace.NamespaceAdmin;
import co.cask.cdap.internal.app.queue.SimpleQueueSpecificationGenerator;
import co.cask.cdap.internal.app.runtime.flow.FlowUtils;
import co.cask.cdap.internal.app.store.DefaultStore;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import co.cask.cdap.notifications.feeds.client.NotificationFeedClientModule;
import co.cask.cdap.notifications.guice.NotificationServiceRuntimeModule;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.tephra.TransactionExecutorFactory;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.gson.Gson;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.twill.api.TwillRunnerService;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.kafka.client.KafkaClientService;
import org.apache.twill.zookeeper.ZKClientService;

/* loaded from: input_file:co/cask/cdap/data/tools/flow/FlowQueuePendingCorrector.class */
public class FlowQueuePendingCorrector extends AbstractIdleService {
    private static final Gson GSON = new Gson();
    private final MetricsCollectionService metricsCollectionService;
    private final MetricStore metricStore;
    private final KafkaClientService kafkaClientService;
    private final HBaseQueueDebugger queueDebugger;
    private final ZKClientService zkClientService;
    private final Store store;
    private final ProgramRuntimeService programRuntimeService;
    private final TwillRunnerService twillRunnerService;
    private final NamespaceAdmin namespaceAdmin;

    @Inject
    public FlowQueuePendingCorrector(HBaseQueueDebugger hBaseQueueDebugger, ZKClientService zKClientService, MetricsCollectionService metricsCollectionService, MetricStore metricStore, KafkaClientService kafkaClientService, Store store, ProgramRuntimeService programRuntimeService, TwillRunnerService twillRunnerService, NamespaceAdmin namespaceAdmin) {
        this.queueDebugger = hBaseQueueDebugger;
        this.zkClientService = zKClientService;
        this.metricsCollectionService = metricsCollectionService;
        this.metricStore = metricStore;
        this.kafkaClientService = kafkaClientService;
        this.store = store;
        this.programRuntimeService = programRuntimeService;
        this.twillRunnerService = twillRunnerService;
        this.namespaceAdmin = namespaceAdmin;
    }

    public void run() throws Exception {
        System.out.println("Running queue.pending correction");
        Iterator it = this.namespaceAdmin.listNamespaces().iterator();
        while (it.hasNext()) {
            run(Id.Namespace.from(((NamespaceMeta) it.next()).getName()));
        }
    }

    public void run(Id.Namespace namespace) throws Exception {
        System.out.println("Running queue.pending correction on namespace " + namespace);
        for (ApplicationSpecification applicationSpecification : this.store.getAllApplications(namespace)) {
            run(Id.Application.from(namespace, applicationSpecification.getName()), applicationSpecification);
        }
    }

    public void run(Id.Application application) throws Exception {
        run(application, this.store.getApplication(application));
    }

    public void run(Id.Application application, ApplicationSpecification applicationSpecification) throws Exception {
        System.out.println("Running queue.pending correction on app " + application);
        Preconditions.checkArgument(applicationSpecification.getName().equals(application.getId()), String.format("Expected appSpec name '%s' to be equal to appId name '%s'", applicationSpecification.getName(), application.getId()));
        Iterator it = applicationSpecification.getFlows().values().iterator();
        while (it.hasNext()) {
            run(Id.Flow.from(application, ((FlowSpecification) it.next()).getName()));
        }
    }

    public void run(Id.Flow flow) throws Exception {
        ApplicationSpecification application = this.store.getApplication(flow.getApplication());
        Preconditions.checkArgument(application != null);
        Preconditions.checkArgument(application.getFlows().containsKey(flow.getId()));
        run(flow, (FlowSpecification) application.getFlows().get(flow.getId()));
    }

    public void run(Id.Flow flow, FlowSpecification flowSpecification) throws Exception {
        System.out.println("Running queue.pending correction on flow " + flow);
        for (Table.Cell cell : new SimpleQueueSpecificationGenerator(flow.getApplication()).create(flowSpecification).cellSet()) {
            if (((QueueSpecificationGenerator.Node) cell.getRowKey()).getType() == FlowletConnection.Type.FLOWLET) {
                String name = ((QueueSpecificationGenerator.Node) cell.getRowKey()).getName();
                String str = (String) cell.getColumnKey();
                Iterator it = ((Set) cell.getValue()).iterator();
                while (it.hasNext()) {
                    run(flow, name, str, ((QueueSpecification) it.next()).getQueueName().getSimpleName());
                }
            }
        }
    }

    public void run(Id.Flow flow, String str, String str2, String str3) throws Exception {
        ApplicationSpecification application = this.store.getApplication(flow.getApplication());
        Preconditions.checkArgument(application != null, flow.getApplication() + " not found");
        Preconditions.checkArgument(application.getFlows().containsKey(flow.getId()), flow + " not found");
        run(flow, str, str2, str3, (FlowSpecification) application.getFlows().get(flow.getId()));
    }

    public void run(Id.Flow flow, String str, String str2, String str3, FlowSpecification flowSpecification) throws Exception {
        long j;
        long value;
        System.out.println("Running queue.pending correction on flow '" + flow + "' producerFlowlet '" + str + "' consumerFlowlet '" + str2 + "' flowletQueue '" + str3 + "'");
        Preconditions.checkState(this.programRuntimeService.list(flow).isEmpty(), "Cannot run tool when flow " + flow + " is still running");
        Table create = new SimpleQueueSpecificationGenerator(flow.getApplication()).create(flowSpecification);
        Preconditions.checkArgument(create.contains(QueueSpecificationGenerator.Node.flowlet(str), str2), "Flowlet " + str + " is not emitting to " + str2);
        boolean z = false;
        Iterator it = ((Set) create.get(QueueSpecificationGenerator.Node.flowlet(str), str2)).iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            } else if (((QueueSpecification) it.next()).getQueueName().getSimpleName().equals(str3)) {
                z = true;
                break;
            }
        }
        Preconditions.checkArgument(z, "Queue " + str3 + " does not exist for the given flowlets");
        try {
            HBaseQueueDebugger.QueueStatistics scanQueue = this.queueDebugger.scanQueue(QueueName.fromFlowlet(flow, str, str3), Long.valueOf(FlowUtils.generateConsumerGroupId(flow, str2)));
            j = scanQueue.getUnprocessed() + scanQueue.getProcessedAndNotVisible();
        } catch (NotFoundException e) {
            j = 0;
        }
        ImmutableMap build = ImmutableMap.builder().put("ns", flow.getNamespaceId()).put("app", flow.getApplicationId()).put("fl", flow.getId()).put("co", str2).put("pr", str).put("flq", str3).build();
        Collection query = this.metricStore.query(new MetricDataQuery(0L, 0L, Integer.MAX_VALUE, 1, ImmutableMap.of("system.queue.pending", AggregationFunction.SUM), build, ImmutableList.of(), (Interpolator) null));
        if (query.isEmpty()) {
            value = 0;
        } else {
            System.out.println("Got results: " + GSON.toJson(query));
            Preconditions.checkState(query.size() == 1);
            List timeValues = ((MetricTimeSeries) query.iterator().next()).getTimeValues();
            Preconditions.checkState(timeValues.size() == 1);
            value = ((TimeValue) timeValues.get(0)).getValue();
        }
        this.metricsCollectionService.startAndWait();
        this.metricsCollectionService.getCollector(build).gauge("queue.pending", j);
        System.out.printf("Adjusted system.queue.pending metric from %d to %d (tags %s)\n", Long.valueOf(value), Long.valueOf(j), GSON.toJson(build));
        this.metricsCollectionService.stopAndWait();
    }

    protected void startUp() throws Exception {
        this.kafkaClientService.startAndWait();
        this.zkClientService.startAndWait();
        this.twillRunnerService.startAndWait();
        this.programRuntimeService.startAndWait();
        this.queueDebugger.startAndWait();
    }

    protected void shutDown() throws Exception {
        this.queueDebugger.stopAndWait();
        this.programRuntimeService.startAndWait();
        this.twillRunnerService.startAndWait();
        this.zkClientService.stopAndWait();
        this.kafkaClientService.stopAndWait();
    }

    public static FlowQueuePendingCorrector createCorrector() {
        return (FlowQueuePendingCorrector) Guice.createInjector(new Module[]{new ConfigModule(CConfiguration.create(), HBaseConfiguration.create()), new IOModule(), new ZKClientModule(), new LocationRuntimeModule().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new StreamAdminModules().getDistributedModules(), new NotificationFeedClientModule(), new TwillModule(), new AuthModule(), new ExploreClientModule(), new DataFabricDistributedModule(), new ServiceStoreModules().getDistributedModules(), new DataSetsModules().getDistributedModules(), new AppFabricServiceRuntimeModule().getDistributedModules(), new ProgramRunnerRuntimeModule().getDistributedModules(), new SystemDatasetRuntimeModule().getDistributedModules(), new NotificationServiceRuntimeModule().getDistributedModules(), new MetricsClientRuntimeModule().getDistributedModules(), new KafkaClientModule(), new AbstractModule() { // from class: co.cask.cdap.data.tools.flow.FlowQueuePendingCorrector.1
            protected void configure() {
                bind(QueueClientFactory.class).to(HBaseQueueClientFactory.class).in(Singleton.class);
                bind(QueueAdmin.class).to(HBaseQueueAdmin.class).in(Singleton.class);
                bind(HBaseTableUtil.class).toProvider(HBaseTableUtilFactory.class);
            }

            @Named("defaultStore")
            @Singleton
            @Provides
            public Store getStore(DatasetFramework datasetFramework, CConfiguration cConfiguration, LocationFactory locationFactory, NamespacedLocationFactory namespacedLocationFactory, TransactionExecutorFactory transactionExecutorFactory) {
                return new DefaultStore(cConfiguration, locationFactory, namespacedLocationFactory, transactionExecutorFactory, datasetFramework);
            }

            @Named("datasetMDS")
            @Singleton
            @Provides
            public DatasetFramework getInDsFramework(DatasetFramework datasetFramework) {
                return datasetFramework;
            }
        }}).getInstance(FlowQueuePendingCorrector.class);
    }

    public static void main(String[] strArr) throws Exception {
        CommandLine parseArgs = parseArgs(strArr);
        FlowQueuePendingCorrector createCorrector = createCorrector();
        createCorrector.startAndWait();
        try {
            String optionValue = parseArgs.getOptionValue("namespace");
            String optionValue2 = parseArgs.getOptionValue("app");
            String optionValue3 = parseArgs.getOptionValue("flow");
            if (!parseArgs.hasOption("namespace")) {
                createCorrector.run();
            } else if (!parseArgs.hasOption("app")) {
                createCorrector.run(Id.Namespace.from(parseArgs.getOptionValue("namespace")));
            } else if (!parseArgs.hasOption("flow")) {
                Preconditions.checkArgument(parseArgs.hasOption("namespace"));
                createCorrector.run(Id.Application.from(parseArgs.getOptionValue("namespace"), parseArgs.getOptionValue("app")));
            } else if (parseArgs.hasOption("producer-flowlet") || parseArgs.hasOption("consumer-flowlet")) {
                Preconditions.checkArgument(parseArgs.hasOption("producer-flowlet"), "Missing producer-flowlet option");
                Preconditions.checkArgument(parseArgs.hasOption("consumer-flowlet"), "Missing consumer-flowlet option");
                createCorrector.run(Id.Flow.from(optionValue, optionValue2, optionValue3), parseArgs.getOptionValue("producer-flowlet"), parseArgs.getOptionValue("consumer-flowlet"), parseArgs.getOptionValue("queue", "queue"));
            } else {
                createCorrector.run(Id.Flow.from(parseArgs.getOptionValue("namespace"), parseArgs.getOptionValue("app"), parseArgs.getOptionValue("flow")));
            }
        } finally {
            createCorrector.stopAndWait();
        }
    }

    private static CommandLine parseArgs(String[] strArr) {
        Options options = new Options();
        options.addOption(createOption("n", "namespace", true, "namespace (optional, leave empty to correct all flowlets)", false));
        options.addOption(createOption("a", "app", true, "app (optional, leave empty to correct all apps)", false));
        options.addOption(createOption("f", "flow", true, "flow (optional, leave empty to correct all flows)", false));
        options.addOption(createOption("p", "producer-flowlet", true, "producer flowlet (optional, leave empty to correct entire flow)", false));
        options.addOption(createOption("c", "consumer-flowlet", true, "consumer flowlet (optional, leave empty to correct entire flow)", false));
        options.addOption(createOption("q", "queue", true, "flowlet queue (optional, defaults to \"queue\")", false));
        try {
            return new BasicParser().parse(options, strArr);
        } catch (ParseException e) {
            System.out.println(e.getMessage());
            new HelpFormatter().printHelp("[--namespace <namespace> [--app <app> [--flow <flow> [[--producer-flowlet <flowlet> --consumer-flowlet <flowlet> [--queue <queue>]]]]]]", options);
            System.exit(0);
            return null;
        }
    }

    private static Option createOption(String str, String str2, boolean z, String str3, boolean z2) {
        Option option = new Option(str, str2, z, str3);
        if (z2) {
            option.setRequired(true);
        }
        return option;
    }
}
