/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.elasticsearch.bolt;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.elasticsearch.bolt.EsIndexBolt;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public class EsIndexTopology {
    static final String SPOUT_ID = "spout";
    static final String BOLT_ID = "bolt";
    static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";

    public static void main(String[] args) throws Exception {
        Config config = new Config();
        config.setNumWorkers(1);
        TopologyBuilder builder = new TopologyBuilder();
        UserDataSpout spout = new UserDataSpout();
        builder.setSpout(SPOUT_ID, (IRichSpout)spout, (Number)1);
        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
        EsConfig esConfig = new EsConfig("http://localhost:9300");
        builder.setBolt(BOLT_ID, (IRichBolt)new EsIndexBolt(esConfig, tupleMapper), (Number)1).shuffleGrouping(SPOUT_ID);
        EsTestUtil.startEsNode();
        EsTestUtil.waitForSeconds(5);
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology(TOPOLOGY_NAME, (Map)config, builder.createTopology());
        EsTestUtil.waitForSeconds(20);
        cluster.killTopology(TOPOLOGY_NAME);
        System.out.println("cluster begin to shutdown");
        cluster.shutdown();
        System.out.println("cluster shutdown");
        System.exit(0);
    }

    public static class UserDataSpout
    extends BaseRichSpout {
        private ConcurrentHashMap<UUID, Values> pending;
        private SpoutOutputCollector collector;
        private String[] sources = new String[]{"{\"user\":\"user1\"}", "{\"user\":\"user2\"}", "{\"user\":\"user3\"}", "{\"user\":\"user4\"}"};
        private int index = 0;
        private int count = 0;
        private long total = 0L;
        private String indexName = "index1";
        private String typeName = "type1";

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(new String[]{"source", "index", "type", "id"}));
        }

        public void open(Map config, TopologyContext context, SpoutOutputCollector collector) {
            this.collector = collector;
            this.pending = new ConcurrentHashMap();
        }

        public void nextTuple() {
            String source = this.sources[this.index];
            UUID msgId = UUID.randomUUID();
            Values values = new Values(new Object[]{source, this.indexName, this.typeName, msgId});
            this.pending.put(msgId, values);
            this.collector.emit((List)values, (Object)msgId);
            ++this.index;
            if (this.index >= this.sources.length) {
                this.index = 0;
            }
            ++this.count;
            ++this.total;
            if (this.count > 1000) {
                this.count = 0;
                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
            }
            Thread.yield();
        }

        public void ack(Object msgId) {
            this.pending.remove(msgId);
        }

        public void fail(Object msgId) {
            System.out.println("**** RESENDING FAILED TUPLE");
            this.collector.emit((List)this.pending.get(msgId), msgId);
        }
    }
}

