/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.kafka;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.serializer.Decoder;
import kafka.serializer.StringDecoder;
import kafka.utils.VerifiableProperties;
import org.apache.iotdb.kafka.KafkaConsumerThread;

public class KafkaConsumer {
    private ConsumerConnector consumer;

    private KafkaConsumer() {
        Properties props = new Properties();
        props.put("zookeeper.connect", "127.0.0.1:2181");
        props.put("group.id", "consumeGroup");
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("rebalance.max.retries", "5");
        props.put("rebalance.backoff.ms", "1200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        ConsumerConfig config = new ConsumerConfig(props);
        this.consumer = Consumer.createJavaConsumerConnector((ConsumerConfig)config);
    }

    public static void main(String[] args) {
        new KafkaConsumer().consume();
    }

    private void consume() {
        HashMap<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put("Kafka-Test", 5);
        StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
        StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());
        Map consumerMap = this.consumer.createMessageStreams(topicCountMap, (Decoder)keyDecoder, (Decoder)valueDecoder);
        List streams = (List)consumerMap.get("Kafka-Test");
        ExecutorService executor = Executors.newFixedThreadPool(5);
        for (KafkaStream stream : streams) {
            executor.submit(new KafkaConsumerThread((KafkaStream<String, String>)stream));
        }
    }
}

