package org.apache.storm.hdfs.bolt;

import java.io.FileInputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.StormSubmitter;
import org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat;
import org.apache.storm.hdfs.bolt.format.DelimitedRecordFormat;
import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
import org.apache.storm.hdfs.common.rotation.MoveFileAction;
import org.apache.storm.hdfs.spout.Configs;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.jets3t.service.utils.gatekeeper.GatekeeperMessage;
import org.yaml.snakeyaml.Yaml;

/* loaded from: input_file:org/apache/storm/hdfs/bolt/HdfsFileTopology.class */
public class HdfsFileTopology {
    static final String SENTENCE_SPOUT_ID = "sentence-spout";
    static final String BOLT_ID = "my-bolt";
    static final String TOPOLOGY_NAME = "test-topology";

    /* loaded from: input_file:org/apache/storm/hdfs/bolt/HdfsFileTopology$MyBolt.class */
    public static class MyBolt extends BaseRichBolt {
        private HashMap<String, Long> counts = null;
        private OutputCollector collector;

        public void prepare(Map map, TopologyContext topologyContext, OutputCollector outputCollector) {
            this.counts = new HashMap<>();
            this.collector = outputCollector;
        }

        public void execute(Tuple tuple) {
            this.collector.ack(tuple);
        }

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        }

        public void cleanup() {
        }
    }

    /* loaded from: input_file:org/apache/storm/hdfs/bolt/HdfsFileTopology$SentenceSpout.class */
    public static class SentenceSpout extends BaseRichSpout {
        private ConcurrentHashMap<UUID, Values> pending;
        private SpoutOutputCollector collector;
        private String[] sentences = {"my dog has fleas", "i like cold beverages", "the dog ate my homework", "don't have a cow man", "i don't think i like fleas"};
        private int index = 0;
        private int count = 0;
        private long total = 0;

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"sentence", "timestamp"}));
        }

        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.collector = spoutOutputCollector;
            this.pending = new ConcurrentHashMap<>();
        }

        public void nextTuple() {
            Values values = new Values(new Object[]{this.sentences[this.index], Long.valueOf(System.currentTimeMillis())});
            UUID randomUUID = UUID.randomUUID();
            this.pending.put(randomUUID, values);
            this.collector.emit(values, randomUUID);
            this.index++;
            if (this.index >= this.sentences.length) {
                this.index = 0;
            }
            this.count++;
            this.total++;
            if (this.count > 20000) {
                this.count = 0;
                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
            }
            Thread.yield();
        }

        public void ack(Object obj) {
            this.pending.remove(obj);
        }

        public void fail(Object obj) {
            System.out.println("**** RESENDING FAILED TUPLE");
            this.collector.emit(this.pending.get(obj), obj);
        }
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        config.setNumWorkers(1);
        SentenceSpout sentenceSpout = new SentenceSpout();
        CountSyncPolicy countSyncPolicy = new CountSyncPolicy(1000);
        TimedRotationPolicy timedRotationPolicy = new TimedRotationPolicy(1.0f, TimedRotationPolicy.TimeUnit.MINUTES);
        DefaultFileNameFormat withExtension = new DefaultFileNameFormat().withPath("/tmp/foo/").withExtension(".txt");
        DelimitedRecordFormat withFieldDelimiter = new DelimitedRecordFormat().withFieldDelimiter(GatekeeperMessage.DELIM);
        Yaml yaml = new Yaml();
        FileInputStream fileInputStream = new FileInputStream(strArr[1]);
        Map map = (Map) yaml.load(fileInputStream);
        fileInputStream.close();
        config.put(Configs.DEFAULT_HDFS_CONFIG_KEY, map);
        HdfsBolt addRotationAction = new HdfsBolt().withConfigKey(Configs.DEFAULT_HDFS_CONFIG_KEY).withFsUrl(strArr[0]).withFileNameFormat(withExtension).withRecordFormat(withFieldDelimiter).withRotationPolicy(timedRotationPolicy).withSyncPolicy(countSyncPolicy).addRotationAction(new MoveFileAction().toDestination("/tmp/dest2/"));
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout(SENTENCE_SPOUT_ID, sentenceSpout, 1);
        topologyBuilder.setBolt(BOLT_ID, addRotationAction, 4).shuffleGrouping(SENTENCE_SPOUT_ID);
        if (strArr.length != 2) {
            if (strArr.length == 3) {
                StormSubmitter.submitTopology(strArr[2], config, topologyBuilder.createTopology());
                return;
            } else {
                System.out.println("Usage: HdfsFileTopology [hdfs url] [hdfs yaml config file] <topology name>");
                return;
            }
        }
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
        waitForSeconds(120);
        localCluster.killTopology(TOPOLOGY_NAME);
        localCluster.shutdown();
        System.exit(0);
    }

    public static void waitForSeconds(int i) {
        try {
            Thread.sleep(i * 1000);
        } catch (InterruptedException e) {
        }
    }
}
