package org.apache.storm.elasticsearch.bolt;

import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

/* loaded from: input_file:org/apache/storm/elasticsearch/bolt/EsIndexTopology.class */
public class EsIndexTopology {
    static final String SPOUT_ID = "spout";
    static final String BOLT_ID = "bolt";
    static final String TOPOLOGY_NAME = "elasticsearch-test-topology1";

    /* loaded from: input_file:org/apache/storm/elasticsearch/bolt/EsIndexTopology$UserDataSpout.class */
    public static class UserDataSpout extends BaseRichSpout {
        private ConcurrentHashMap<UUID, Values> pending;
        private SpoutOutputCollector collector;
        private String[] sources = {"{\"user\":\"user1\"}", "{\"user\":\"user2\"}", "{\"user\":\"user3\"}", "{\"user\":\"user4\"}"};
        private int index = 0;
        private int count = 0;
        private long total = 0;
        private String indexName = "index1";
        private String typeName = "type1";

        public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
            outputFieldsDeclarer.declare(new Fields(new String[]{"source", "index", "type", "id"}));
        }

        public void open(Map map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
            this.collector = spoutOutputCollector;
            this.pending = new ConcurrentHashMap<>();
        }

        public void nextTuple() {
            String str = this.sources[this.index];
            UUID randomUUID = UUID.randomUUID();
            Values values = new Values(new Object[]{str, this.indexName, this.typeName, randomUUID});
            this.pending.put(randomUUID, values);
            this.collector.emit(values, randomUUID);
            this.index++;
            if (this.index >= this.sources.length) {
                this.index = 0;
            }
            this.count++;
            this.total++;
            if (this.count > 1000) {
                this.count = 0;
                System.out.println("Pending count: " + this.pending.size() + ", total: " + this.total);
            }
            Thread.yield();
        }

        public void ack(Object obj) {
            this.pending.remove(obj);
        }

        public void fail(Object obj) {
            System.out.println("**** RESENDING FAILED TUPLE");
            this.collector.emit(this.pending.get(obj), obj);
        }
    }

    public static void main(String[] strArr) throws Exception {
        Config config = new Config();
        config.setNumWorkers(1);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout(SPOUT_ID, new UserDataSpout(), 1);
        topologyBuilder.setBolt(BOLT_ID, new EsIndexBolt(new EsConfig("http://localhost:9300"), EsTestUtil.generateDefaultTupleMapper()), 1).shuffleGrouping(SPOUT_ID);
        EsTestUtil.startEsNode();
        EsTestUtil.waitForSeconds(5);
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology(TOPOLOGY_NAME, config, topologyBuilder.createTopology());
        EsTestUtil.waitForSeconds(20);
        localCluster.killTopology(TOPOLOGY_NAME);
        System.out.println("cluster begin to shutdown");
        localCluster.shutdown();
        System.out.println("cluster shutdown");
        System.exit(0);
    }
}
