package org.apache.storm.hdfs.spout;

import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.ExecutorSummary;
import org.apache.storm.generated.KillOptions;
import org.apache.storm.generated.Nimbus;
import org.apache.storm.generated.SpoutStats;
import org.apache.storm.generated.TopologyInfo;
import org.apache.storm.generated.TopologySummary;
import org.apache.storm.metric.LoggingMetricsConsumer;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.NimbusClient;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/hdfs/spout/HdfsSpoutTopology.class */
public class HdfsSpoutTopology {
    public static final String SPOUT_ID = "hdfsspout";
    public static final String BOLT_ID = "constbolt";

    /* loaded from: input_file:org/apache/storm/hdfs/spout/HdfsSpoutTopology$ConstBolt.class */
    public static class ConstBolt extends BaseRichBolt {
        private static final long serialVersionUID = -5313598399155365865L;
        public static final String FIELDS = "message";
        private OutputCollector collector;
        private static final Logger log = LoggerFactory.getLogger((Class<?>) ConstBolt.class);
        int count = 0;

        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.collector = outputCollector;
        }

        public void execute(Tuple tuple) {
            log.info("Received tuple : {}", tuple.getValue(0));
            this.count++;
            if (this.count == 3) {
                this.collector.fail(tuple);
            } else {
                this.collector.ack(tuple);
            }
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"message"}));
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length < 7) {
            System.err.println("Please check command line arguments.");
            System.err.println("Usage :");
            System.err.println(HdfsSpoutTopology.class.toString() + " topologyName hdfsUri fileFormat sourceDir sourceArchiveDir badDir destinationDir.");
            System.err.println(" topologyName - topology name.");
            System.err.println(" hdfsUri - hdfs name node URI");
            System.err.println(" fileFormat -  Set to 'TEXT' for reading text files or 'SEQ' for sequence files.");
            System.err.println(" sourceDir  - read files from this HDFS dir using HdfsSpout.");
            System.err.println(" archiveDir - after a file in sourceDir is read completely, it is moved to this HDFS location.");
            System.err.println(" badDir - files that cannot be read properly will be moved to this HDFS location.");
            System.err.println(" spoutCount - Num of spout instances.");
            System.err.println();
            System.exit(-1);
        }
        String str = strArr[0];
        String str2 = strArr[1];
        String str3 = strArr[2];
        String str4 = strArr[3];
        String str5 = strArr[4];
        String str6 = strArr[5];
        int parseInt = Integer.parseInt(strArr[6]);
        ConstBolt constBolt = new ConstBolt();
        HdfsSpout badFilesDir = new HdfsSpout().withOutputFields(TextFileReader.defaultFields).setReaderType(str3).setHdfsUri(str2).setSourceDir(str4).setArchiveDir(str5).setBadFilesDir(str6);
        Config config = new Config();
        config.setNumWorkers(1);
        config.setNumAckers(1);
        config.setMaxTaskParallelism(1);
        config.setDebug(true);
        config.registerMetricsConsumer(LoggingMetricsConsumer.class);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout(SPOUT_ID, badFilesDir, Integer.valueOf(parseInt));
        topologyBuilder.setBolt(BOLT_ID, constBolt, 1).shuffleGrouping(SPOUT_ID);
        Map readStormConfig = Utils.readStormConfig();
        StormSubmitter.submitTopologyWithProgressBar(str, config, topologyBuilder.createTopology());
        Nimbus.Client client = NimbusClient.getConfiguredClient(readStormConfig).getClient();
        for (int i = 0; i < 40; i++) {
            Thread.sleep(30000L);
            printMetrics(client, str);
        }
        kill(client, str);
    }

    private static void kill(Nimbus.Client client, String str) throws Exception {
        KillOptions killOptions = new KillOptions();
        killOptions.set_wait_secs(0);
        client.killTopologyWithOpts(str, killOptions);
    }

    static void printMetrics(Nimbus.Client client, String str) throws Exception {
        Long l;
        String str2 = null;
        for (TopologySummary topologySummary : client.getClusterInfo().get_topologies()) {
            if (str.equals(topologySummary.get_name())) {
                str2 = topologySummary.get_id();
            }
        }
        if (str2 == null) {
            throw new Exception("Could not find a topology named " + str);
        }
        TopologyInfo topologyInfo = client.getTopologyInfo(str2);
        int i = topologyInfo.get_uptime_secs();
        long j = 0;
        long j2 = 0;
        double d = 0.0d;
        for (ExecutorSummary executorSummary : topologyInfo.get_executors()) {
            if ("spout".equals(executorSummary.get_component_id())) {
                SpoutStats spoutStats = executorSummary.get_stats().get_specific().get_spout();
                Map map = (Map) spoutStats.get_failed().get(":all-time");
                Map map2 = (Map) spoutStats.get_acked().get(":all-time");
                Map map3 = (Map) spoutStats.get_complete_ms_avg().get(":all-time");
                for (String str3 : map2.keySet()) {
                    if (map != null && (l = (Long) map.get(str3)) != null) {
                        j2 += l.longValue();
                    }
                    long longValue = ((Long) map2.get(str3)).longValue();
                    j += longValue;
                    d += ((Double) map3.get(str3)).doubleValue() * longValue;
                }
            }
        }
        System.out.println("uptime: " + i + " acked: " + j + " avgLatency: " + (d / j) + " acked/sec: " + (j / i) + " failed: " + j2);
    }
}
