package co.cask.cdap.data.tools;

import co.cask.cdap.api.app.ApplicationSpecification;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletConnection;
import co.cask.cdap.app.guice.AppFabricServiceRuntimeModule;
import co.cask.cdap.app.guice.ProgramRunnerRuntimeModule;
import co.cask.cdap.app.guice.ServiceStoreModules;
import co.cask.cdap.app.queue.QueueSpecification;
import co.cask.cdap.app.queue.QueueSpecificationGenerator;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.NotFoundException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.IOModule;
import co.cask.cdap.common.guice.KafkaClientModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.TwillModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.namespace.NamespaceAdmin;
import co.cask.cdap.common.namespace.NamespacedLocationFactory;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.common.utils.ImmutablePair;
import co.cask.cdap.data.runtime.DataFabricDistributedModule;
import co.cask.cdap.data.runtime.DataSetsModules;
import co.cask.cdap.data.runtime.SystemDatasetRuntimeModule;
import co.cask.cdap.data.stream.StreamAdminModules;
import co.cask.cdap.data.view.ViewAdminModules;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.data2.queue.ConsumerConfig;
import co.cask.cdap.data2.queue.ConsumerGroupConfig;
import co.cask.cdap.data2.queue.DequeueStrategy;
import co.cask.cdap.data2.queue.QueueClientFactory;
import co.cask.cdap.data2.transaction.Transactions;
import co.cask.cdap.data2.transaction.queue.ConsumerEntryState;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.queue.QueueConstants;
import co.cask.cdap.data2.transaction.queue.QueueEntryRow;
import co.cask.cdap.data2.transaction.queue.QueueScanner;
import co.cask.cdap.data2.transaction.queue.hbase.HBaseConsumerStateStore;
import co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueAdmin;
import co.cask.cdap.data2.transaction.queue.hbase.HBaseQueueClientFactory;
import co.cask.cdap.data2.transaction.queue.hbase.QueueBarrier;
import co.cask.cdap.data2.transaction.queue.hbase.ShardedHBaseQueueStrategy;
import co.cask.cdap.data2.util.hbase.HBaseTableUtil;
import co.cask.cdap.data2.util.hbase.HBaseTableUtilFactory;
import co.cask.cdap.data2.util.hbase.ScanBuilder;
import co.cask.cdap.explore.guice.ExploreClientModule;
import co.cask.cdap.internal.app.queue.SimpleQueueSpecificationGenerator;
import co.cask.cdap.internal.app.runtime.flow.FlowUtils;
import co.cask.cdap.internal.app.store.DefaultStore;
import co.cask.cdap.metrics.guice.MetricsClientRuntimeModule;
import co.cask.cdap.notifications.feeds.client.NotificationFeedClientModule;
import co.cask.cdap.notifications.guice.NotificationServiceRuntimeModule;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NamespaceMeta;
import co.cask.cdap.proto.ProgramType;
import co.cask.tephra.Transaction;
import co.cask.tephra.TransactionExecutor;
import co.cask.tephra.TransactionExecutorFactory;
import co.cask.tephra.TransactionFailureException;
import co.cask.tephra.TransactionNotInProgressException;
import co.cask.tephra.TransactionSystemClient;
import com.google.common.base.Optional;
import com.google.common.base.Predicates;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Multimap;
import com.google.common.collect.Multimaps;
import com.google.common.collect.PeekingIterator;
import com.google.common.collect.Table;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Inject;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.google.inject.name.Named;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.twill.filesystem.LocationFactory;
import org.apache.twill.zookeeper.ZKClientService;

/* loaded from: input_file:co/cask/cdap/data/tools/HBaseQueueDebugger.class */
public class HBaseQueueDebugger extends AbstractIdleService {
    public static final String PROP_SHOW_TX_TIMESTAMP_ONLY = "show.tx.timestamp.only";
    public static final String PROP_SHOW_PROGRESS = "show.progress";
    public static final String PROP_ROWS_CACHE = "rows.cache";
    private final HBaseTableUtil tableUtil;
    private final HBaseQueueAdmin queueAdmin;
    private final ZKClientService zkClientService;
    private final HBaseQueueClientFactory queueClientFactory;
    private final TransactionExecutorFactory txExecutorFactory;
    private final NamespaceAdmin namespaceAdmin;
    private final Store store;

    /* loaded from: input_file:co/cask/cdap/data/tools/HBaseQueueDebugger$QueueStatistics.class */
    public static final class QueueStatistics {
        private Optional<Long> minWritePointer;
        private long unprocessed;
        private long processedAndVisible;
        private long processedAndNotVisible;

        private QueueStatistics() {
            this.minWritePointer = Optional.absent();
        }

        public void recordMinWritePointer(long j) {
            if (this.minWritePointer.isPresent()) {
                this.minWritePointer = Optional.of(Long.valueOf(Math.min(((Long) this.minWritePointer.get()).longValue(), j)));
            } else {
                this.minWritePointer = Optional.of(Long.valueOf(j));
            }
        }

        public void countUnprocessed(long j) {
            this.unprocessed += j;
        }

        public void countProcessedAndVisible(long j) {
            this.processedAndVisible += j;
        }

        public void countProcessedAndNotVisible(long j) {
            this.processedAndNotVisible += j;
        }

        public long getUnprocessed() {
            return this.unprocessed;
        }

        public long getProcessedAndVisible() {
            return this.processedAndVisible;
        }

        public long getProcessedAndNotVisible() {
            return this.processedAndNotVisible;
        }

        public long getTotal() {
            return this.unprocessed + this.processedAndVisible + this.processedAndNotVisible;
        }

        public Optional<Long> getMinWritePointer() {
            return this.minWritePointer;
        }

        public String getMinWritePointerString() {
            return this.minWritePointer.isPresent() ? Long.toString(((Long) this.minWritePointer.get()).longValue()) : "n/a";
        }

        public String getMinWritePointerTimestampString() {
            return this.minWritePointer.isPresent() ? Long.toString(((Long) this.minWritePointer.get()).longValue() / 1000000) : "n/a";
        }

        private String getTxTimestampReport() {
            return String.format("min tx timestamp: %s", getMinWritePointerTimestampString());
        }

        private String getDetailedReport() {
            return String.format("min write pointer: %s; unprocessed: %d; processed and visible: %d; processed and not visible: %d; total: %d", getMinWritePointerString(), Long.valueOf(getUnprocessed()), Long.valueOf(getProcessedAndVisible()), Long.valueOf(getProcessedAndNotVisible()), Long.valueOf(getTotal()));
        }

        public String getReport(boolean z) {
            return z ? getTxTimestampReport() : getDetailedReport();
        }

        public void add(QueueStatistics queueStatistics) {
            if (queueStatistics.getMinWritePointer().isPresent()) {
                recordMinWritePointer(((Long) queueStatistics.getMinWritePointer().get()).longValue());
            }
            countUnprocessed(queueStatistics.getUnprocessed());
            countProcessedAndNotVisible(queueStatistics.getProcessedAndNotVisible());
            countProcessedAndVisible(queueStatistics.getProcessedAndVisible());
        }
    }

    @Inject
    public HBaseQueueDebugger(HBaseTableUtil hBaseTableUtil, HBaseQueueAdmin hBaseQueueAdmin, HBaseQueueClientFactory hBaseQueueClientFactory, ZKClientService zKClientService, TransactionExecutorFactory transactionExecutorFactory, NamespaceAdmin namespaceAdmin, Store store) {
        this.tableUtil = hBaseTableUtil;
        this.queueAdmin = hBaseQueueAdmin;
        this.queueClientFactory = hBaseQueueClientFactory;
        this.zkClientService = zKClientService;
        this.txExecutorFactory = transactionExecutorFactory;
        this.namespaceAdmin = namespaceAdmin;
        this.store = store;
    }

    protected void startUp() throws Exception {
        this.zkClientService.startAndWait();
    }

    protected void shutDown() throws Exception {
        this.zkClientService.stopAndWait();
    }

    public void scanAllQueues() throws Exception {
        QueueStatistics queueStatistics = new QueueStatistics();
        Iterator it = this.namespaceAdmin.list().iterator();
        while (it.hasNext()) {
            Id.Namespace from = Id.Namespace.from(((NamespaceMeta) it.next()).getName());
            for (ApplicationSpecification applicationSpecification : this.store.getAllApplications(from)) {
                Id.Application from2 = Id.Application.from(from, applicationSpecification.getName());
                for (FlowSpecification flowSpecification : applicationSpecification.getFlows().values()) {
                    for (Table.Cell cell : new SimpleQueueSpecificationGenerator(Id.Flow.from(from2, flowSpecification.getName()).getApplication()).create(flowSpecification).cellSet()) {
                        if (((QueueSpecificationGenerator.Node) cell.getRowKey()).getType() == FlowletConnection.Type.FLOWLET) {
                            Iterator it2 = ((Set) cell.getValue()).iterator();
                            while (it2.hasNext()) {
                                queueStatistics.add(scanQueue(((QueueSpecification) it2.next()).getQueueName(), null));
                            }
                        }
                    }
                }
            }
        }
        System.out.printf("Total results for all queues: %s\n", queueStatistics.getReport(showTxTimestampOnly()));
    }

    public QueueStatistics scanQueue(QueueName queueName, @Nullable Long l) throws Exception {
        try {
            HBaseConsumerStateStore consumerStateStore = this.queueAdmin.getConsumerStateStore(queueName);
            TransactionExecutor createTransactionExecutor = Transactions.createTransactionExecutor(this.txExecutorFactory, consumerStateStore);
            Multimap multimap = (Multimap) createTransactionExecutor.execute(new TransactionExecutor.Function<HBaseConsumerStateStore, Multimap<Long, QueueBarrier>>() { // from class: co.cask.cdap.data.tools.HBaseQueueDebugger.1
                public Multimap<Long, QueueBarrier> apply(HBaseConsumerStateStore hBaseConsumerStateStore) throws Exception {
                    return hBaseConsumerStateStore.getAllBarriers();
                }
            }, consumerStateStore);
            printProgress("Got %d barriers\n", Integer.valueOf(multimap.size()));
            QueueStatistics queueStatistics = new QueueStatistics();
            if (l != null) {
                multimap = Multimaps.filterKeys(multimap, Predicates.equalTo(l));
            }
            for (Map.Entry entry : multimap.asMap().entrySet()) {
                long longValue = ((Long) entry.getKey()).longValue();
                Collection collection = (Collection) entry.getValue();
                printProgress("Scanning barriers for group %d\n", Long.valueOf(longValue));
                int i = 1;
                PeekingIterator peekingIterator = Iterators.peekingIterator(collection.iterator());
                while (peekingIterator.hasNext()) {
                    QueueBarrier queueBarrier = (QueueBarrier) peekingIterator.next();
                    QueueBarrier queueBarrier2 = peekingIterator.hasNext() ? (QueueBarrier) peekingIterator.peek() : null;
                    printProgress("Scanning section %d/%d...\n", Integer.valueOf(i), Integer.valueOf(collection.size()));
                    scanQueue(createTransactionExecutor, consumerStateStore, queueName, queueBarrier, queueBarrier2, queueStatistics);
                    printProgress("Current results: %s\n", queueStatistics.getReport(showTxTimestampOnly()));
                    i++;
                }
                printProgress("Scanning complete", new Object[0]);
            }
            System.out.printf("Results for queue %s: %s\n", queueName.toString(), queueStatistics.getReport(showTxTimestampOnly()));
            return queueStatistics;
        } catch (IllegalStateException e) {
            throw new NotFoundException(queueName);
        }
    }

    private void printProgress(String str, Object... objArr) {
        if (showProgress()) {
            System.out.printf(str, objArr);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean showTxTimestampOnly() {
        return Boolean.parseBoolean(System.getProperty(PROP_SHOW_TX_TIMESTAMP_ONLY));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean showProgress() {
        return Boolean.parseBoolean(System.getProperty(PROP_SHOW_PROGRESS));
    }

    private void scanQueue(TransactionExecutor transactionExecutor, HBaseConsumerStateStore hBaseConsumerStateStore, QueueName queueName, QueueBarrier queueBarrier, @Nullable QueueBarrier queueBarrier2, final QueueStatistics queueStatistics) throws Exception {
        final byte[] queueRowPrefix = QueueEntryRow.getQueueRowPrefix(queueName);
        ConsumerGroupConfig groupConfig = queueBarrier.getGroupConfig();
        printProgress("Got consumer group config: %s\n", groupConfig);
        HTable createHTable = this.queueClientFactory.createHTable(this.queueClientFactory.getQueueAdmin().getDataTableId(queueName, QueueConstants.QueueType.SHARDED_QUEUE));
        printProgress("Looking at HBase table: %s\n", Bytes.toString(createHTable.getTableName()));
        final byte[] add = Bytes.add(QueueEntryRow.STATE_COLUMN_PREFIX, Bytes.toBytes(groupConfig.getGroupId()));
        ShardedHBaseQueueStrategy shardedHBaseQueueStrategy = new ShardedHBaseQueueStrategy(this.tableUtil, this.queueClientFactory.getDistributorBuckets(createHTable.getTableDescriptor()));
        ScanBuilder buildScan = this.tableUtil.buildScan();
        buildScan.setStartRow(queueBarrier.getStartRow());
        if (queueBarrier2 != null) {
            buildScan.setStopRow(queueBarrier2.getStartRow());
        } else {
            buildScan.setStopRow(QueueEntryRow.getQueueEntryRowKey(queueName, Long.MAX_VALUE, Integer.MAX_VALUE));
        }
        buildScan.addColumn(QueueEntryRow.COLUMN_FAMILY, QueueEntryRow.META_COLUMN);
        buildScan.addColumn(QueueEntryRow.COLUMN_FAMILY, add);
        buildScan.setCacheBlocks(false);
        buildScan.setMaxVersions(1);
        printProgress("Scanning section with scan: %s\n", buildScan.toString());
        ArrayList newArrayList = Lists.newArrayList();
        if (groupConfig.getDequeueStrategy() == DequeueStrategy.FIFO) {
            newArrayList.add(0);
        } else {
            for (int i = 0; i < groupConfig.getGroupSize(); i++) {
                newArrayList.add(Integer.valueOf(i));
            }
        }
        final int parseInt = Integer.parseInt(System.getProperty(PROP_ROWS_CACHE, "100000"));
        Iterator it = newArrayList.iterator();
        while (it.hasNext()) {
            final int intValue = ((Integer) it.next()).intValue();
            printProgress("Processing instance %d", Integer.valueOf(intValue));
            final QueueScanner createScanner = shardedHBaseQueueStrategy.createScanner(new ConsumerConfig(groupConfig, intValue), createHTable, buildScan.build(), parseInt);
            try {
                transactionExecutor.execute(new TransactionExecutor.Procedure<HBaseConsumerStateStore>() { // from class: co.cask.cdap.data.tools.HBaseQueueDebugger.2
                    public void apply(HBaseConsumerStateStore hBaseConsumerStateStore2) throws Exception {
                        while (true) {
                            ImmutablePair next = createScanner.next();
                            if (next == null) {
                                return;
                            }
                            HBaseQueueDebugger.this.visitRow(queueStatistics, hBaseConsumerStateStore2.getTransaction(), (byte[]) next.getFirst(), (byte[]) ((Map) next.getSecond()).get(add), queueRowPrefix.length);
                            if (HBaseQueueDebugger.this.showProgress() && queueStatistics.getTotal() % parseInt == 0) {
                                System.out.printf("\rProcessing instance %d: %s", Integer.valueOf(intValue), queueStatistics.getReport(HBaseQueueDebugger.this.showTxTimestampOnly()));
                            }
                        }
                    }
                }, hBaseConsumerStateStore);
            } catch (TransactionFailureException e) {
                if (!(Throwables.getRootCause(e) instanceof TransactionNotInProgressException)) {
                    throw Throwables.propagate(e);
                }
            }
            printProgress("\rProcessing instance %d: %s\n", Integer.valueOf(intValue), queueStatistics.getReport(showTxTimestampOnly()));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void visitRow(QueueStatistics queueStatistics, Transaction transaction, byte[] bArr, byte[] bArr2, int i) {
        if (bArr2 == null) {
            queueStatistics.countUnprocessed(1L);
            return;
        }
        if (QueueEntryRow.getState(bArr2) == ConsumerEntryState.PROCESSED) {
            long writePointer = QueueEntryRow.getWritePointer(bArr, i);
            queueStatistics.recordMinWritePointer(writePointer);
            if (transaction.isVisible(writePointer)) {
                queueStatistics.countProcessedAndVisible(1L);
            } else {
                queueStatistics.countProcessedAndNotVisible(1L);
            }
        }
    }

    public static HBaseQueueDebugger createDebugger() {
        return (HBaseQueueDebugger) Guice.createInjector(new Module[]{new ConfigModule(CConfiguration.create(), HBaseConfiguration.create()), new IOModule(), new ZKClientModule(), new LocationRuntimeModule().getDistributedModules(), new DiscoveryRuntimeModule().getDistributedModules(), new ViewAdminModules().getDistributedModules(), new StreamAdminModules().getDistributedModules(), new NotificationFeedClientModule(), new TwillModule(), new ExploreClientModule(), new DataFabricDistributedModule(), new ServiceStoreModules().getDistributedModules(), new DataSetsModules().getDistributedModules(), new AppFabricServiceRuntimeModule().getDistributedModules(), new ProgramRunnerRuntimeModule().getDistributedModules(), new SystemDatasetRuntimeModule().getDistributedModules(), new NotificationServiceRuntimeModule().getDistributedModules(), new MetricsClientRuntimeModule().getDistributedModules(), new KafkaClientModule(), new AbstractModule() { // from class: co.cask.cdap.data.tools.HBaseQueueDebugger.3
            protected void configure() {
                bind(QueueClientFactory.class).to(HBaseQueueClientFactory.class).in(Singleton.class);
                bind(QueueAdmin.class).to(HBaseQueueAdmin.class).in(Singleton.class);
                bind(HBaseTableUtil.class).toProvider(HBaseTableUtilFactory.class);
            }

            @Named("defaultStore")
            @Singleton
            @Provides
            public Store getStore(CConfiguration cConfiguration, LocationFactory locationFactory, NamespacedLocationFactory namespacedLocationFactory, TransactionExecutorFactory transactionExecutorFactory, DatasetFramework datasetFramework, TransactionSystemClient transactionSystemClient) {
                return new DefaultStore(cConfiguration, locationFactory, namespacedLocationFactory, transactionExecutorFactory, datasetFramework, transactionSystemClient);
            }

            @Named("datasetMDS")
            @Singleton
            @Provides
            public DatasetFramework getInDsFramework(DatasetFramework datasetFramework) {
                return datasetFramework;
            }
        }}).getInstance(HBaseQueueDebugger.class);
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length >= 1 && strArr[0].equals("help")) {
            System.out.println("Arguments: [<queue-uri> [consumer-flowlet]]");
            System.out.println("queue-uri: queue:///<namespace>/<app>/<flow>/<flowlet>/<queue>");
            System.out.println("consumer-flowlet: <flowlet>");
            System.out.println("If queue-uri is not provided, scan all queues");
            System.out.println("Example: queue:///default/PurchaseHistory/PurchaseFlow/reader/queue collector");
            System.out.println();
            System.out.println("System properties:");
            System.out.println("-Dshow.progress=true         Show progress while scanning the queue table");
            System.out.println("-Drows.cache=[num_of_rows]   Number of rows to pass to HBase Scan.setCaching() method");
            System.exit(1);
        }
        QueueName from = strArr.length >= 1 ? QueueName.from(URI.create(strArr[0])) : null;
        Long l = null;
        if (strArr.length >= 2) {
            l = Long.valueOf(FlowUtils.generateConsumerGroupId(Id.Program.from(from.getFirstComponent(), from.getSecondComponent(), ProgramType.FLOW, from.getThirdComponent()), strArr[1]));
        }
        HBaseQueueDebugger createDebugger = createDebugger();
        createDebugger.startAndWait();
        if (from != null) {
            createDebugger.scanQueue(from, l);
        } else {
            createDebugger.scanAllQueues();
        }
        createDebugger.stopAndWait();
    }
}
