package org.apache.storm.loadgen;

import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/loadgen/ThroughputVsLatency.class */
public class ThroughputVsLatency {
    private static final Logger LOG = LoggerFactory.getLogger(ThroughputVsLatency.class);
    private static final int TEST_EXECUTE_TIME_DEFAULT = 5;
    private static final long DEFAULT_RATE_PER_SECOND = 500;
    private static final String DEFAULT_TOPO_NAME = "wc-test";
    private static final int DEFAULT_NUM_SPOUTS = 1;
    private static final int DEFAULT_NUM_SPLITS = 1;
    private static final int DEFAULT_NUM_COUNTS = 1;

    /* loaded from: input_file:org/apache/storm/loadgen/ThroughputVsLatency$FastRandomSentenceSpout.class */
    public static class FastRandomSentenceSpout extends LoadSpout {
        static final String[] SENTENCES = {"the cow jumped over the moon", "an apple a day keeps the doctor away", "four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature"};

        public FastRandomSentenceSpout(long j) {
            super(j);
        }

        @Override // org.apache.storm.loadgen.LoadSpout
        protected Values getNextValues(OutputStreamEngine outputStreamEngine) {
            return new Values(new Object[]{SENTENCES[outputStreamEngine.nextInt(SENTENCES.length)]});
        }

        @Override // org.apache.storm.loadgen.LoadSpout
        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"sentence"}));
        }
    }

    /* loaded from: input_file:org/apache/storm/loadgen/ThroughputVsLatency$SplitSentence.class */
    public static class SplitSentence extends BaseBasicBolt {
        private ExecAndProcessLatencyEngine sleep;
        private int executorIndex;

        public SplitSentence(SlowExecutorPattern slowExecutorPattern) {
            this.sleep = new ExecAndProcessLatencyEngine(slowExecutorPattern);
        }

        public void prepare(Map<String, Object> map, TopologyContext topologyContext) {
            this.executorIndex = topologyContext.getThisTaskIndex();
            this.sleep.prepare();
        }

        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            this.sleep.simulateProcessAndExecTime(this.executorIndex, Time.nanoTime(), null, () -> {
                for (String str : tuple.getString(0).split("\\s+")) {
                    basicOutputCollector.emit(new Values(new Object[]{str, 1}));
                }
            });
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"word", "count"}));
        }
    }

    /* loaded from: input_file:org/apache/storm/loadgen/ThroughputVsLatency$WordCount.class */
    public static class WordCount extends BaseBasicBolt {
        Map<String, Integer> counts = new HashMap();

        public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
            String string = tuple.getString(0);
            Integer num = this.counts.get(string);
            if (num == null) {
                num = 0;
            }
            Integer valueOf = Integer.valueOf(num.intValue() + 1);
            this.counts.put(string, valueOf);
            basicOutputCollector.emit(new Values(new Object[]{string, valueOf}));
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"word", "count"}));
        }
    }

    public static void main(String[] strArr) throws Exception {
        Options options = new Options();
        options.addOption(Option.builder("h").longOpt("help").desc("Print a help message").build());
        options.addOption(Option.builder("t").longOpt("test-time").argName("MINS").hasArg().desc("How long to run the tests for in mins (defaults to 5)").build());
        options.addOption(Option.builder().longOpt("rate").argName("SENTENCES/SEC").hasArg().desc("How many sentences per second to run. (defaults to 500)").build());
        options.addOption(Option.builder().longOpt("name").argName("TOPO_NAME").hasArg().desc("Name of the topology to run (defaults to wc-test)").build());
        options.addOption(Option.builder().longOpt("spouts").argName("NUM").hasArg().desc("Number of spouts to use (defaults to 1)").build());
        options.addOption(Option.builder().longOpt("splitters").argName("NUM").hasArg().desc("Number of splitter bolts to use (defaults to 1)").build());
        options.addOption(Option.builder().longOpt("splitter-imbalance").argName("MS(:COUNT)?").hasArg().desc("The number of ms that the first COUNT splitters will wait before processing.  This creates an imbalance that helps test load aware groupings (defaults to 0:1)").build());
        options.addOption(Option.builder().longOpt("counters").argName("NUM").hasArg().desc("Number of counter bolts to use (defaults to 1)").build());
        LoadMetricsServer.addCommandLineOptions(options);
        DefaultParser defaultParser = new DefaultParser();
        CommandLine commandLine = null;
        Exception exc = null;
        SlowExecutorPattern slowExecutorPattern = null;
        double d = 5.0d;
        double d2 = 500.0d;
        String str = DEFAULT_TOPO_NAME;
        int i = 1;
        int i2 = 1;
        int i3 = 1;
        try {
            commandLine = defaultParser.parse(options, strArr);
            if (commandLine.hasOption("t")) {
                d = Double.valueOf(commandLine.getOptionValue("t")).doubleValue();
            }
            if (commandLine.hasOption("rate")) {
                d2 = Double.parseDouble(commandLine.getOptionValue("rate"));
            }
            if (commandLine.hasOption("name")) {
                str = commandLine.getOptionValue("name");
            }
            if (commandLine.hasOption("spouts")) {
                i = Integer.parseInt(commandLine.getOptionValue("spouts"));
            }
            if (commandLine.hasOption("splitters")) {
                i2 = Integer.parseInt(commandLine.getOptionValue("splitters"));
            }
            if (commandLine.hasOption("counters")) {
                i3 = Integer.parseInt(commandLine.getOptionValue("counters"));
            }
            if (commandLine.hasOption("splitter-imbalance")) {
                slowExecutorPattern = SlowExecutorPattern.fromString(commandLine.getOptionValue("splitter-imbalance"));
            }
        } catch (NumberFormatException | ParseException e) {
            exc = e;
        }
        if (exc != null || commandLine.hasOption('h')) {
            if (exc != null) {
                System.err.println("ERROR " + exc.getMessage());
            }
            new HelpFormatter().printHelp("ThroughputVsLatency [options]", options);
            return;
        }
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put("target_rate", Double.valueOf(d2));
        linkedHashMap.put("spout_parallel", Integer.valueOf(i));
        linkedHashMap.put("split_parallel", Integer.valueOf(i2));
        linkedHashMap.put("count_parallel", Integer.valueOf(i3));
        Config config = new Config();
        Map readStormConfig = Utils.readStormConfig();
        LoadMetricsServer loadMetricsServer = new LoadMetricsServer(readStormConfig, commandLine, linkedHashMap);
        loadMetricsServer.serve();
        String url = loadMetricsServer.getUrl();
        NimbusClient configuredClient = NimbusClient.getConfiguredClient(readStormConfig);
        config.registerMetricsConsumer(LoggingMetricsConsumer.class);
        config.registerMetricsConsumer(HttpForwardingMetricsConsumer.class, url, 1L);
        HashMap hashMap = new HashMap();
        if (!NimbusClient.isLocalOverride()) {
            hashMap.put("CPU", "org.apache.storm.metrics.sigar.CPUMetric");
        }
        config.put("topology.worker.metrics", hashMap);
        config.put("topology.builtin.metrics.bucket.size.secs", 10);
        config.put("topology.worker.gc.childopts", "-XX:+UseConcMarkSweepGC -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:NewSize=128m -XX:CMSInitiatingOccupancyFraction=70 -XX:-CMSConcurrentMTEnabled");
        config.put("topology.worker.childopts", "-Xmx2g");
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new FastRandomSentenceSpout(((long) d2) / i), Integer.valueOf(i));
        topologyBuilder.setBolt("split", new SplitSentence(slowExecutorPattern), Integer.valueOf(i2)).shuffleGrouping("spout");
        topologyBuilder.setBolt("count", new WordCount(), Integer.valueOf(i3)).fieldsGrouping("split", new Fields(new String[]{"word"}));
        try {
            try {
                ScopedTopologySet scopedTopologySet = new ScopedTopologySet(configuredClient.getClient());
                Throwable th = null;
                try {
                    StormSubmitter.submitTopology(str, config, topologyBuilder.createTopology());
                    scopedTopologySet.add(str);
                    loadMetricsServer.monitorFor(d, configuredClient.getClient(), scopedTopologySet);
                    if (scopedTopologySet != null) {
                        if (0 != 0) {
                            try {
                                scopedTopologySet.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            scopedTopologySet.close();
                        }
                    }
                    System.exit(0);
                } catch (Throwable th3) {
                    if (scopedTopologySet != null) {
                        if (0 != 0) {
                            try {
                                scopedTopologySet.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            scopedTopologySet.close();
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                System.exit(-1);
                throw th5;
            }
        } catch (Exception e2) {
            LOG.error("Error while running test", e2);
            System.exit(-1);
        }
    }
}
