package org.apache.storm.starter;

import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import org.HdrHistogram.Histogram;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.ClusterSummary;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SpoutStats;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.metric.HttpForwardingMetricsConsumer;
import org.apache.storm.metric.HttpForwardingMetricsServer;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.metric.api.IMetricsConsumer;
import org.apache.storm.metrics.hdrhistogram.HistogramMetric;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;

/* loaded from: input_file:org/apache/storm/starter/ThroughputVsLatency.class */
public class ThroughputVsLatency {
    private static final Histogram _histo = new Histogram(3600000000000L, 3);
    private static final AtomicLong _systemCPU = new AtomicLong(0);
    private static final AtomicLong _userCPU = new AtomicLong(0);
    private static final AtomicLong _gcCount = new AtomicLong(0);
    private static final AtomicLong _gcMs = new AtomicLong(0);
    private static final ConcurrentHashMap<String, MemMeasure> _memoryBytes = new ConcurrentHashMap<>();
    private static long _prev_acked = 0;
    private static long _prev_uptime = 0;

    /* loaded from: input_file:org/apache/storm/starter/ThroughputVsLatency$C.class */
    public static class C {
        LocalCluster _local;
        Nimbus.Client _client;

        public C(Map map) {
            this._local = null;
            this._client = null;
            Map readStormConfig = Utils.readStormConfig();
            if (map != null) {
                readStormConfig.putAll(map);
            }
            Boolean bool = (Boolean) readStormConfig.get("run.local");
            if (bool == null || !bool.booleanValue()) {
                this._client = NimbusClient.getConfiguredClient(readStormConfig).getClient();
            } else {
                this._local = new LocalCluster();
            }
        }

        public ClusterSummary getClusterInfo() throws Exception {
            return this._local != null ? this._local.getClusterInfo() : this._client.getClusterInfo();
        }

        public TopologyInfo getTopologyInfo(String str) throws Exception {
            return this._local != null ? this._local.getTopologyInfo(str) : this._client.getTopologyInfo(str);
        }

        public void killTopologyWithOpts(String str, KillOptions killOptions) throws Exception {
            if (this._local != null) {
                this._local.killTopologyWithOpts(str, killOptions);
            } else {
                this._client.killTopologyWithOpts(str, killOptions);
            }
        }

        public void submitTopology(String str, Map map, StormTopology stormTopology) throws Exception {
            if (this._local != null) {
                this._local.submitTopology(str, map, stormTopology);
            } else {
                StormSubmitter.submitTopology(str, map, stormTopology);
            }
        }

        public boolean isLocal() {
            return this._local != null;
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/ThroughputVsLatency$FastRandomSentenceSpout.class */
    public static class FastRandomSentenceSpout extends BaseRichSpout {
        static final String[] SENTENCES = {"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};
        SpoutOutputCollector _collector;
        long _periodNano;
        long _emitAmount;
        Random _rand;
        long _nextEmitTime;
        long _emitsLeft;
        HistogramMetric _histo;

        public FastRandomSentenceSpout(long j) {
            if (j > 0) {
                this._periodNano = Math.max(1L, HRegion.MAX_FLUSH_PER_CHANGES / j);
                this._emitAmount = Math.max(1L, (long) ((j / 1.0E9d) * this._periodNano));
            } else {
                this._periodNano = 9223372036854775806L;
                this._emitAmount = 1L;
            }
        }

        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this._collector = spoutOutputCollector;
            this._rand = ThreadLocalRandom.current();
            this._nextEmitTime = System.nanoTime();
            this._emitsLeft = this._emitAmount;
            this._histo = new HistogramMetric(3600000000000L, 3);
            topologyContext.registerMetric("comp-lat-histo", this._histo, 10);
        }

        public void nextTuple() {
            if (this._emitsLeft <= 0 && this._nextEmitTime <= System.nanoTime()) {
                this._emitsLeft = this._emitAmount;
                this._nextEmitTime += this._periodNano;
            }
            if (this._emitsLeft > 0) {
                String str = SENTENCES[this._rand.nextInt(SENTENCES.length)];
                this._collector.emit(new Values(new Object[]{str}), new SentWithTime(str, this._nextEmitTime - this._periodNano));
                this._emitsLeft--;
            }
        }

        public void ack(Object obj) {
            this._histo.recordValue(System.nanoTime() - ((SentWithTime) obj).time);
        }

        public void fail(Object obj) {
            this._collector.emit(new Values(new Object[]{((SentWithTime) obj).sentence}), obj);
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"sentence"}));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/starter/ThroughputVsLatency$MemMeasure.class */
    public static class MemMeasure {
        private long _mem;
        private long _time;

        private MemMeasure() {
            this._mem = 0L;
            this._time = 0L;
        }

        public synchronized void update(long j) {
            this._mem = j;
            this._time = System.currentTimeMillis();
        }

        public synchronized long get() {
            if (isExpired()) {
                return 0L;
            }
            return this._mem;
        }

        public synchronized boolean isExpired() {
            return System.currentTimeMillis() - this._time >= 20000;
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/ThroughputVsLatency$SentWithTime.class */
    private static class SentWithTime {
        public final String sentence;
        public final long time;

        SentWithTime(String str, long j) {
            this.sentence = str;
            this.time = j;
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/ThroughputVsLatency$SplitSentence.class */
    public static class SplitSentence extends BaseBasicBolt {
        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            for (String str : tuple.getString(0).split("\\s+")) {
                basicOutputCollector.emit(new Values(new Object[]{str, 1}));
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"word", "count"}));
        }
    }

    /* loaded from: input_file:org/apache/storm/starter/ThroughputVsLatency$WordCount.class */
    public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap();

        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String string = tuple.getString(0);
            Integer num = this.counts.get(string);
            if (num == null) {
                num = 0;
            }
            Integer valueOf = Integer.valueOf(num.intValue() + 1);
            this.counts.put(string, valueOf);
            basicOutputCollector.emit(new Values(new Object[]{string, valueOf}));
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"word", "count"}));
        }
    }

    private static long readMemory() {
        long j = 0;
        Iterator<MemMeasure> it = _memoryBytes.values().iterator();
        while (it.hasNext()) {
            j += it.next().get();
        }
        return j;
    }

    public static void printMetrics(C c, String str) throws Exception {
        long valueAtPercentile;
        long valueAtPercentile2;
        long minValue;
        long maxValue;
        double mean;
        double stdDeviation;
        Long l;
        String str2 = null;
        for (TopologySummary topologySummary : c.getClusterInfo().get_topologies()) {
            if (str.equals(topologySummary.get_name())) {
                str2 = topologySummary.get_id();
            }
        }
        if (str2 == null) {
            throw new Exception("Could not find a topology named " + str);
        }
        TopologyInfo topologyInfo = c.getTopologyInfo(str2);
        int i = topologyInfo.get_uptime_secs();
        long j = 0;
        long j2 = 0;
        for (ExecutorSummary executorSummary : topologyInfo.get_executors()) {
            if ("spout".equals(executorSummary.get_component_id()) && executorSummary.get_stats() != null && executorSummary.get_stats().get_specific() != null) {
                SpoutStats spoutStats = executorSummary.get_stats().get_specific().get_spout();
                Map map = (Map) spoutStats.get_failed().get(":all-time");
                Map map2 = (Map) spoutStats.get_acked().get(":all-time");
                if (map2 != null) {
                    for (String str3 : map2.keySet()) {
                        if (map != null && (l = (Long) map.get(str3)) != null) {
                            j2 += l.longValue();
                        }
                        j += ((Long) map2.get(str3)).longValue();
                    }
                }
            }
        }
        long j3 = j - _prev_acked;
        long j4 = i - _prev_uptime;
        synchronized (_histo) {
            valueAtPercentile = _histo.getValueAtPercentile(99.0d);
            valueAtPercentile2 = _histo.getValueAtPercentile(99.9d);
            minValue = _histo.getMinValue();
            maxValue = _histo.getMaxValue();
            mean = _histo.getMean();
            stdDeviation = _histo.getStdDeviation();
            _histo.reset();
        }
        System.out.printf("uptime: %,4d acked: %,9d acked/sec: %,10.2f failed: %,8d 99%%: %,15d 99.9%%: %,15d min: %,15d max: %,15d mean: %,15.2f stddev: %,15.2f user: %,10d sys: %,10d gc: %,10d mem: %,10.2f\n", Integer.valueOf(i), Long.valueOf(j3), Double.valueOf(j3 / j4), Long.valueOf(j2), Long.valueOf(valueAtPercentile), Long.valueOf(valueAtPercentile2), Long.valueOf(minValue), Long.valueOf(maxValue), Double.valueOf(mean), Double.valueOf(stdDeviation), Long.valueOf(_userCPU.getAndSet(0L)), Long.valueOf(_systemCPU.getAndSet(0L)), Long.valueOf(_gcMs.getAndSet(0L)), Double.valueOf(readMemory() / 1048576.0d));
        _prev_uptime = i;
        _prev_acked = j;
    }

    public static void kill(C c, String str) throws Exception {
        KillOptions killOptions = new KillOptions();
        killOptions.set_wait_secs(0);
        c.killTopologyWithOpts(str, killOptions);
    }

    public static void main(String[] strArr) throws Exception {
        long j = 500;
        if (strArr != null && strArr.length > 0) {
            j = Long.valueOf(strArr[0]).longValue();
        }
        int i = 4;
        if (strArr != null && strArr.length > 1) {
            i = Integer.valueOf(strArr[1]).intValue();
        }
        int i2 = 5;
        if (strArr != null && strArr.length > 2) {
            i2 = Integer.valueOf(strArr[2]).intValue();
        }
        String str = "wc-test";
        if (strArr != null && strArr.length > 3) {
            str = strArr[3];
        }
        Map config = new Config();
        HttpForwardingMetricsServer httpForwardingMetricsServer = new HttpForwardingMetricsServer(config) { // from class: org.apache.storm.starter.ThroughputVsLatency.1
            public void handle(IMetricsConsumer.TaskInfo taskInfo, Collection<IMetricsConsumer.DataPoint> collection) {
                String str2 = taskInfo.srcWorkerHost + ":" + taskInfo.srcWorkerPort;
                for (IMetricsConsumer.DataPoint dataPoint : collection) {
                    if ("comp-lat-histo".equals(dataPoint.name) && (dataPoint.value instanceof Histogram)) {
                        synchronized (ThroughputVsLatency._histo) {
                            ThroughputVsLatency._histo.add((Histogram) dataPoint.value);
                        }
                    } else if ("CPU".equals(dataPoint.name) && (dataPoint.value instanceof Map)) {
                        Map map = (Map) dataPoint.value;
                        Object obj = map.get("sys-ms");
                        if (obj instanceof Number) {
                            ThroughputVsLatency._systemCPU.getAndAdd(((Number) obj).longValue());
                        }
                        Object obj2 = map.get("user-ms");
                        if (obj2 instanceof Number) {
                            ThroughputVsLatency._userCPU.getAndAdd(((Number) obj2).longValue());
                        }
                    } else if (dataPoint.name.startsWith("GC/") && (dataPoint.value instanceof Map)) {
                        Map map2 = (Map) dataPoint.value;
                        Object obj3 = map2.get("count");
                        if (obj3 instanceof Number) {
                            ThroughputVsLatency._gcCount.getAndAdd(((Number) obj3).longValue());
                        }
                        Object obj4 = map2.get("timeMs");
                        if (obj4 instanceof Number) {
                            ThroughputVsLatency._gcMs.getAndAdd(((Number) obj4).longValue());
                        }
                    } else if (dataPoint.name.startsWith("memory/") && (dataPoint.value instanceof Map)) {
                        Object obj5 = ((Map) dataPoint.value).get("usedBytes");
                        if (obj5 instanceof Number) {
                            MemMeasure memMeasure = (MemMeasure) ThroughputVsLatency._memoryBytes.get(str2);
                            if (memMeasure == null) {
                                MemMeasure memMeasure2 = new MemMeasure();
                                MemMeasure memMeasure3 = (MemMeasure) ThroughputVsLatency._memoryBytes.putIfAbsent(str2, memMeasure2);
                                memMeasure = memMeasure3 == null ? memMeasure2 : memMeasure3;
                            }
                            memMeasure.update(((Number) obj5).longValue());
                        }
                    }
                }
            }
        };
        httpForwardingMetricsServer.serve();
        String url = httpForwardingMetricsServer.getUrl();
        C c = new C(config);
        config.setNumWorkers(i);
        config.registerMetricsConsumer(LoggingMetricsConsumer.class);
        config.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1L);
        HashMap hashMap = new HashMap();
        if (!c.isLocal()) {
            hashMap.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
        }
        config.put("topology.worker.metrics", hashMap);
        config.put("topology.builtin.metrics.bucket.size.secs", 10);
        config.put("topology.worker.gc.childopts", "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
        config.put("topology.worker.childopts", "-Xmx2g");
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        int i3 = 4 * i;
        topologyBuilder.setSpout("spout", new FastRandomSentenceSpout(j / i3), Integer.valueOf(i3));
        topologyBuilder.setBolt("split", new SplitSentence(), Integer.valueOf(i3)).shuffleGrouping("spout");
        topologyBuilder.setBolt("count", new WordCount(), Integer.valueOf(i3)).fieldsGrouping("split", new Fields(new String[]{"word"}));
        try {
            c.submitTopology(str, config, topologyBuilder.createTopology());
            for (int i4 = 0; i4 < i2 * 2; i4++) {
                Thread.sleep(30000L);
                printMetrics(c, str);
            }
            System.exit(0);
        } finally {
            kill(c, str);
        }
    }
}
