/*
 * Decompiled with CFR 0.152.
 */
package org.apache.storm.elasticsearch.trident;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.elasticsearch.common.EsConfig;
import org.apache.storm.elasticsearch.common.EsTestUtil;
import org.apache.storm.elasticsearch.common.EsTupleMapper;
import org.apache.storm.elasticsearch.trident.EsStateFactory;
import org.apache.storm.elasticsearch.trident.EsUpdater;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.AuthorizationException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.trident.Stream;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.trident.state.StateFactory;
import org.apache.storm.trident.state.StateUpdater;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

public final class TridentEsTopology {
    private static final int BATCH_SIZE_DEFAULT = 100;
    private static final String TOPOLOGY_NAME = "elasticsearch-test-topology2";

    public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
        int batchSize = 100;
        FixedBatchSpout spout = new FixedBatchSpout(batchSize);
        spout.cycle = true;
        TridentTopology topology = new TridentTopology();
        Stream stream = topology.newStream("spout", (IBatchSpout)spout);
        EsConfig esConfig = new EsConfig("http://localhost:9300");
        Fields esFields = new Fields(new String[]{"index", "type", "source"});
        EsTupleMapper tupleMapper = EsTestUtil.generateDefaultTupleMapper();
        EsStateFactory factory = new EsStateFactory(esConfig, tupleMapper);
        TridentState state = stream.partitionPersist((StateFactory)factory, esFields, (StateUpdater)new EsUpdater(), new Fields(new String[0]));
        EsTestUtil.startEsNode();
        EsTestUtil.waitForSeconds(5);
        StormSubmitter.submitTopology((String)TOPOLOGY_NAME, (Map)new Config(), (StormTopology)topology.build());
    }

    private TridentEsTopology() {
    }

    public static class FixedBatchSpout
    implements IBatchSpout {
        private static final long serialVersionUID = 1L;
        private int maxBatchSize;
        private HashMap<Long, List<List<Object>>> batches = new HashMap();
        private Values[] outputs = new Values[]{new Values(new Object[]{"{\"user\":\"user1\"}", "index1", "type1", UUID.randomUUID().toString()}), new Values(new Object[]{"{\"user\":\"user2\"}", "index1", "type2", UUID.randomUUID().toString()}), new Values(new Object[]{"{\"user\":\"user3\"}", "index2", "type1", UUID.randomUUID().toString()}), new Values(new Object[]{"{\"user\":\"user4\"}", "index2", "type2", UUID.randomUUID().toString()})};
        private int index = 0;
        private boolean cycle = false;

        public FixedBatchSpout(int maxBatchSizeArg) {
            this.maxBatchSize = maxBatchSizeArg;
        }

        public Fields getOutputFields() {
            return new Fields(new String[]{"source", "index", "type", "id"});
        }

        public void open(Map<String, Object> conf, TopologyContext context) {
            this.index = 0;
        }

        public void emitBatch(long batchId, TridentCollector collector) {
            List<List<Object>> batch = this.batches.get(batchId);
            if (batch == null) {
                batch = new ArrayList<List<Object>>();
                if (this.index >= this.outputs.length && this.cycle) {
                    this.index = 0;
                }
                for (int i = 0; i < this.maxBatchSize; ++i) {
                    if (this.index == this.outputs.length) {
                        this.index = 0;
                    }
                    batch.add((List<Object>)this.outputs[this.index]);
                    ++this.index;
                }
                this.batches.put(batchId, batch);
            }
            for (List<Object> list : batch) {
                collector.emit(list);
            }
        }

        public void ack(long batchId) {
            this.batches.remove(batchId);
        }

        public void close() {
        }

        public Map<String, Object> getComponentConfiguration() {
            Config conf = new Config();
            conf.setMaxTaskParallelism(1);
            return conf;
        }
    }
}

