package co.cask.cdap.cli.command;

import co.cask.cdap.api.annotation.Beta;
import co.cask.cdap.api.data.format.Formats;
import co.cask.cdap.api.data.schema.Schema;
import co.cask.cdap.cli.ArgumentName;
import co.cask.cdap.cli.CLIConfig;
import co.cask.cdap.cli.ElementType;
import co.cask.cdap.cli.english.Article;
import co.cask.cdap.cli.english.Fragment;
import co.cask.cdap.cli.util.AbstractCommand;
import co.cask.cdap.client.QueryClient;
import co.cask.cdap.client.StreamClient;
import co.cask.cdap.explore.client.ExploreExecutionResult;
import co.cask.cdap.proto.ColumnDesc;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.QueryResult;
import co.cask.cdap.proto.StreamProperties;
import co.cask.common.cli.Arguments;
import com.google.common.base.Strings;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Sets;
import com.google.inject.Inject;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.slf4j.Marker;

@Beta
/* loaded from: input_file:co/cask/cdap/cli/command/GetStreamStatsCommand.class */
public class GetStreamStatsCommand extends AbstractCommand {
    private static final int DEFAULT_LIMIT = 100;
    private static final int MAX_LIMIT = 100000;
    private final StreamClient streamClient;
    private final QueryClient queryClient;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/cli/command/GetStreamStatsCommand$CountUniqueProcessor.class */
    public static final class CountUniqueProcessor implements StatsProcessor {
        private final Set<Object> elements;

        private CountUniqueProcessor() {
            this.elements = Sets.newHashSet();
        }

        @Override // co.cask.cdap.cli.command.GetStreamStatsCommand.StatsProcessor
        public void process(Object obj) {
            if (obj != null) {
                this.elements.add(obj);
            }
        }

        @Override // co.cask.cdap.cli.command.GetStreamStatsCommand.StatsProcessor
        public void printReport(PrintStream printStream) {
            printStream.print("Unique elements: " + this.elements.size());
            printStream.println();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/cli/command/GetStreamStatsCommand$HistogramProcessor.class */
    public static final class HistogramProcessor implements StatsProcessor {
        private static final int MIN_BAR_WIDTH = 5;
        private final Multiset<Integer> buckets = HashMultiset.create();
        private static final int BUCKET_SIZE = 100;
        private final CLIConfig cliConfig;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:co/cask/cdap/cli/command/GetStreamStatsCommand$HistogramProcessor$Bucket.class */
        public static final class Bucket {
            private final int index;
            private final int count;

            private Bucket(int i, int i2) {
                this.index = i;
                this.count = i2;
            }

            public int getIndex() {
                return this.index;
            }

            public int getCount() {
                return this.count;
            }

            public int getStartInclusive() {
                return this.index * 100;
            }

            public int getEndInclusive() {
                return getStartInclusive() + 99;
            }

            public String getPrefix() {
                return String.format("  [%d, %d]: %d  ", Integer.valueOf(getStartInclusive()), Integer.valueOf(getEndInclusive()), Integer.valueOf(this.count));
            }
        }

        public HistogramProcessor(CLIConfig cLIConfig) {
            this.cliConfig = cLIConfig;
        }

        @Override // co.cask.cdap.cli.command.GetStreamStatsCommand.StatsProcessor
        public void process(Object obj) {
            if (obj == null || !(obj instanceof Number)) {
                return;
            }
            this.buckets.add(Integer.valueOf(((Number) obj).intValue() / 100));
        }

        @Override // co.cask.cdap.cli.command.GetStreamStatsCommand.StatsProcessor
        public void printReport(PrintStream printStream) {
            if (this.buckets.isEmpty()) {
                return;
            }
            printStream.println("Histogram:");
            ArrayList<Integer> newArrayList = Lists.newArrayList(this.buckets.elementSet());
            Collections.sort(newArrayList);
            int count = getBiggestBucket().getCount();
            int longestBucketPrefix = getLongestBucketPrefix();
            int max = Math.max(5, this.cliConfig.getLineWidth() - longestBucketPrefix);
            for (Integer num : newArrayList) {
                printStream.print(padRight(new Bucket(num.intValue(), this.buckets.count(num)).getPrefix(), longestBucketPrefix));
                int count2 = (int) (((r0.getCount() * 1.0d) / count) * max);
                if (count2 == 0) {
                    printStream.print("|");
                } else if (count2 >= 1) {
                    printStream.print("|" + Strings.repeat(Marker.ANY_NON_NULL_MARKER, count2 - 1));
                }
                printStream.println();
            }
        }

        private Bucket getBiggestBucket() {
            Bucket bucket = null;
            for (Integer num : this.buckets.elementSet()) {
                Bucket bucket2 = new Bucket(num.intValue(), this.buckets.count(num));
                if (bucket == null || bucket2.getCount() > bucket.getCount()) {
                    bucket = bucket2;
                }
            }
            return bucket;
        }

        private String padRight(String str, int i) {
            return String.format("%1$-" + i + "s", str);
        }

        private int getLongestBucketPrefix() {
            int intValue = ((Integer) Collections.max(this.buckets.elementSet(), new Comparator<Integer>() { // from class: co.cask.cdap.cli.command.GetStreamStatsCommand.HistogramProcessor.1
                @Override // java.util.Comparator
                public int compare(Integer num, Integer num2) {
                    return ((Long.toString(num.intValue() * 100).length() * 2) + Long.toString(HistogramProcessor.this.buckets.count(num)).length()) - ((Long.toString(num2.intValue() * 100).length() * 2) + Long.toString(HistogramProcessor.this.buckets.count(num2)).length());
                }
            })).intValue();
            return new Bucket(intValue, this.buckets.count(Integer.valueOf(intValue))).getPrefix().length();
        }
    }

    /* loaded from: input_file:co/cask/cdap/cli/command/GetStreamStatsCommand$StatsProcessor.class */
    private interface StatsProcessor {
        void process(Object obj);

        void printReport(PrintStream printStream);
    }

    @Inject
    public GetStreamStatsCommand(StreamClient streamClient, QueryClient queryClient, CLIConfig cLIConfig) {
        super(cLIConfig);
        this.streamClient = streamClient;
        this.queryClient = queryClient;
    }

    @Override // co.cask.cdap.cli.util.AbstractAuthCommand
    public void perform(Arguments arguments, PrintStream printStream) throws Exception {
        Set set;
        long currentTimeMillis = System.currentTimeMillis();
        Id.Stream from = Id.Stream.from(this.cliConfig.getCurrentNamespace(), arguments.get(ArgumentName.STREAM.toString()));
        int max = Math.max(1, Math.min(100000, arguments.getInt(ArgumentName.LIMIT.toString(), 100).intValue()));
        long timestamp = getTimestamp(arguments.get(ArgumentName.START_TIME.toString(), "min"), currentTimeMillis);
        long timestamp2 = getTimestamp(arguments.get(ArgumentName.END_TIME.toString(), "max"), currentTimeMillis);
        StreamProperties config = this.streamClient.getConfig(from);
        if (config.getFormat().getName().equals(Formats.TEXT)) {
            printStream.printf("No schema found for stream '%s'", from.getId());
            printStream.println();
            return;
        }
        HashMap newHashMap = Maps.newHashMap();
        for (Schema.Field field : config.getFormat().getSchema().getFields()) {
            Schema schema = field.getSchema();
            newHashMap.put(cdapSchemaColumName2HiveColumnName(from, field.getName()), getProcessorsForType(schema.getType(), schema.getUnionSchemas()));
        }
        ExploreExecutionResult exploreExecutionResult = this.queryClient.execute(from.getNamespace(), "SELECT * FROM " + getHiveTableName(from) + " WHERE " + getTimestampHiveColumn(from) + " BETWEEN " + timestamp + " AND " + timestamp2 + " LIMIT " + max).get(1L, TimeUnit.MINUTES);
        List<ColumnDesc> resultSchema = exploreExecutionResult.getResultSchema();
        int i = 0;
        while (exploreExecutionResult.hasNext()) {
            i++;
            QueryResult next = exploreExecutionResult.next();
            for (int i2 = 0; i2 < next.getColumns().size(); i2++) {
                Object obj = next.getColumns().get(i2);
                String name = resultSchema.get(i2).getName();
                if (isUserHiveColumn(from, name) && (set = (Set) newHashMap.get(name)) != null) {
                    Iterator it = set.iterator();
                    while (it.hasNext()) {
                        ((StatsProcessor) it.next()).process(obj);
                    }
                }
            }
        }
        for (ColumnDesc columnDesc : resultSchema) {
            if (isUserHiveColumn(from, columnDesc.getName())) {
                printStream.printf("column: %s, type: %s", getTruncatedColumnName(from, columnDesc.getName()), columnDesc.getType());
                printStream.println();
                Set set2 = (Set) newHashMap.get(columnDesc.getName());
                if (set2 == null || set2.isEmpty()) {
                    printStream.println("No statistics available");
                    printStream.println();
                } else {
                    Iterator it2 = set2.iterator();
                    while (it2.hasNext()) {
                        ((StatsProcessor) it2.next()).printReport(printStream);
                    }
                    printStream.println();
                }
            }
        }
        printStream.printf("Analyzed %d Stream events in the time range [%d, %d]...", Integer.valueOf(i), Long.valueOf(timestamp), Long.valueOf(timestamp2));
        printStream.println();
        printStream.println();
    }

    private String getTruncatedColumnName(Id.Stream stream, String str) {
        String str2 = getHiveTableName(stream) + ".";
        return str.startsWith(str2) ? str.substring(str2.length()) : str;
    }

    private String getTimestampHiveColumn(Id.Stream stream) {
        return cdapSchemaColumName2HiveColumnName(stream, "ts");
    }

    private boolean isUserHiveColumn(Id.Stream stream, String str) {
        return (cdapSchemaColumName2HiveColumnName(stream, "ts").equals(str) || cdapSchemaColumName2HiveColumnName(stream, "headers").equals(str)) ? false : true;
    }

    private String getHiveTableName(Id.Stream stream) {
        return String.format("stream_%s", stream.getId());
    }

    private String cdapSchemaColumName2HiveColumnName(Id.Stream stream, String str) {
        return (getHiveTableName(stream) + "." + str).toLowerCase();
    }

    private Set<StatsProcessor> getProcessorsForType(Schema.Type type, List<Schema> list) {
        ImmutableSet.Builder builder = ImmutableSet.builder();
        boolean isTypeOrInUnion = isTypeOrInUnion(Schema.Type.DOUBLE, type, list);
        boolean isTypeOrInUnion2 = isTypeOrInUnion(Schema.Type.INT, type, list);
        boolean isTypeOrInUnion3 = isTypeOrInUnion(Schema.Type.LONG, type, list);
        boolean isTypeOrInUnion4 = isTypeOrInUnion(Schema.Type.FLOAT, type, list);
        boolean isTypeOrInUnion5 = isTypeOrInUnion(Schema.Type.DOUBLE, type, list);
        boolean isTypeOrInUnion6 = isTypeOrInUnion(Schema.Type.DOUBLE, type, list);
        boolean isTypeOrInUnion7 = isTypeOrInUnion(Schema.Type.STRING, type, list);
        if (isTypeOrInUnion || isTypeOrInUnion2 || isTypeOrInUnion3 || isTypeOrInUnion7 || isTypeOrInUnion4 || isTypeOrInUnion5 || isTypeOrInUnion6) {
            builder.add((ImmutableSet.Builder) new CountUniqueProcessor());
        }
        if (isTypeOrInUnion2 || isTypeOrInUnion3 || isTypeOrInUnion4 || isTypeOrInUnion5) {
            builder.add((ImmutableSet.Builder) new HistogramProcessor(this.cliConfig));
        }
        return builder.build();
    }

    private boolean isTypeOrInUnion(Schema.Type type, Schema.Type type2, List<Schema> list) {
        if (type.equals(type2)) {
            return true;
        }
        Iterator<Schema> it = list.iterator();
        while (it.hasNext()) {
            if (type == it.next().getType()) {
                return true;
            }
        }
        return false;
    }

    @Override // co.cask.common.cli.Command
    public String getPattern() {
        return String.format("get stream-stats <%s> [limit <%s>] [start <%s>] [end <%s>]", ArgumentName.STREAM, ArgumentName.LIMIT, ArgumentName.START_TIME, ArgumentName.END_TIME);
    }

    @Override // co.cask.common.cli.Command
    public String getDescription() {
        return String.format("Gets statistics for %s. The '<%s>' limits how many stream events to analyze; default is %s. The time format for '<%s>' and '<%s>' can be a timestamp in milliseconds or a relative time in the form of '[+|-][0-9][d|h|m|s]'. '<%s>' is relative to current time; '<%s>' is relative to '<%s>'. Special constants 'min' and 'max' can be used to represent '0' and 'max timestamp' respectively.", Fragment.of(Article.A, ElementType.STREAM.getName()), ArgumentName.LIMIT, 100, ArgumentName.START_TIME, ArgumentName.END_TIME, ArgumentName.START_TIME, ArgumentName.END_TIME, ArgumentName.START_TIME);
    }
}
