/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.elasticsearch.bolt;

import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.elasticsearch.bolt.EsIndexBolt;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public final class EsIndexTopology {
    private static final String SPOUT_ID = "spout";
    private static final String BOLT_ID = "bolt";
    private static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";
    private static final int PENDING_COUNT_MAX = 1000;

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        Config config = new Config();
        config.setNumWorkers(1);
        TopologyBuilder builder = new TopologyBuilder();
        UserDataSpout spout = new UserDataSpout();
        builder.setSpout(SPOUT_ID, (IRichSpout)spout, (Number)1);
        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
        EsConfig esConfig = new EsConfig("http://localhost:9300");
        builder.setBolt(BOLT_ID, (IRichBolt)new EsIndexBolt(esConfig, tupleMapper), (Number)1).shuffleGrouping(SPOUT_ID);
        EsTestUtil.startEsNode();
        EsTestUtil.waitForSeconds(5);
        StormSubmitter.submitTopology((String)TOPOLOGY_NAME, (Map)config, (StormTopology)builder.createTopology());
    }

    private EsIndexTopology() {
    }

    public static class UserDataSpout
    extends BaseRichSpout {
        private static final long serialVersionUID = 1L;
        private ConcurrentHashMap<UUID, Values> pending;
        private SpoutOutputCollector collector;
        private String[] sources = new String[]{"{\"user\":\"user1\"}", "{\"user\":\"user2\"}", "{\"user\":\"user3\"}", "{\"user\":\"user4\"}"};
        private int index = 0;
        private int count = 0;
        private long total = 0L;
        private String indexName = "index1";
        private String typeName = "type1";

        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields(new String[]{"source", "index", "type", "id"}));
        }

        public void open(Map<String, Object> config, TopologyContext context, SpoutOutputCollector collectorArg) {
            this.collector = collectorArg;
            this.pending = new ConcurrentHashMap();
        }

        public void nextTuple() {
            String source = this.sources[this.index];
            UUID msgId = UUID.randomUUID();
            Values values = new Values(new Object[]{source, this.indexName, this.typeName, msgId});
            this.pending.put(msgId, values);
            this.collector.emit((List)values, (Object)msgId);
            ++this.index;
            if (this.index >= this.sources.length) {
                this.index = 0;
            }
            ++this.count;
            ++this.total;
            if (this.count > 1000) {
                this.count = 0;
                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
            }
            Thread.yield();
        }

        public void ack(Object msgId) {
            this.pending.remove(msgId);
        }

        public void fail(Object msgId) {
            System.out.println("**** RESENDING FAILED TUPLE");
            this.collector.emit((List)this.pending.get(msgId), msgId);
        }
    }
}

