package com.datatorrent.contrib.kafka;

import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import kafka.message.MessageAndMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.class */
public class HighlevelKafkaConsumer extends KafkaConsumer {
    private static final Logger logger = LoggerFactory.getLogger(HighlevelKafkaConsumer.class);
    private Properties consumerConfig;
    private transient Map<String, ConsumerConnector> standardConsumer;
    private transient ExecutorService consumerThreadExecutor;
    private Map<String, Integer> numStream;

    public HighlevelKafkaConsumer() {
        this.consumerConfig = null;
        this.standardConsumer = null;
        this.consumerThreadExecutor = null;
        this.numStream = new HashMap();
    }

    public HighlevelKafkaConsumer(Properties properties) {
        this.consumerConfig = null;
        this.standardConsumer = null;
        this.consumerThreadExecutor = null;
        this.consumerConfig = properties;
    }

    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public void create() {
        super.create();
        if (this.standardConsumer == null) {
            this.standardConsumer = new HashMap();
        }
        this.consumerConfig.put("consumer.id", "consumer" + System.currentTimeMillis());
        if (this.initialOffset.equalsIgnoreCase("earliest")) {
            this.consumerConfig.put("auto.offset.reset", "smallest");
        } else {
            this.consumerConfig.put("auto.offset.reset", "largest");
        }
    }

    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public void start() {
        super.start();
        for (String str : this.zookeeperMap.keySet()) {
            Properties properties = new Properties();
            properties.putAll(this.consumerConfig);
            properties.setProperty("zookeeper.connect", (String) this.zookeeperMap.get(str).iterator().next());
            this.standardConsumer.put(str, Consumer.createJavaConsumerConnector(new ConsumerConfig(properties)));
        }
        HashMap hashMap = new HashMap();
        if (this.numStream == null || this.numStream.size() == 0) {
            if (this.numStream == null) {
                this.numStream = new HashMap();
            }
            for (Map.Entry<String, List<PartitionMetadata>> entry : KafkaMetadataUtil.getPartitionsForTopic(this.brokers, this.topic).entrySet()) {
                this.numStream.put(entry.getKey(), Integer.valueOf(entry.getValue().size()));
            }
        }
        int i = 0;
        Iterator<Integer> it = this.numStream.values().iterator();
        while (it.hasNext()) {
            i += it.next().intValue();
        }
        if (i <= 0) {
            logger.warn("No more job needed to consume data ");
            return;
        }
        this.consumerThreadExecutor = Executors.newFixedThreadPool(i);
        for (final Map.Entry<String, Integer> entry2 : this.numStream.entrySet()) {
            hashMap.put(this.topic, new Integer(entry2.getValue().intValue()));
            for (final KafkaStream kafkaStream : (List) this.standardConsumer.get(entry2.getKey()).createMessageStreams(hashMap).get(this.topic)) {
                this.consumerThreadExecutor.submit(new Runnable() { // from class: com.datatorrent.contrib.kafka.HighlevelKafkaConsumer.1
                    KafkaPartition kp;

                    {
                        this.kp = new KafkaPartition((String) entry2.getKey(), HighlevelKafkaConsumer.this.topic, -1);
                    }

                    @Override // java.lang.Runnable
                    public void run() {
                        ConsumerIterator it2 = kafkaStream.iterator();
                        HighlevelKafkaConsumer.logger.debug("Thread {} starts consuming message...", Thread.currentThread().getName());
                        while (it2.hasNext() && HighlevelKafkaConsumer.this.isAlive) {
                            MessageAndMetadata next = it2.next();
                            try {
                                this.kp.setPartitionId(next.partition());
                                HighlevelKafkaConsumer.this.putMessage(this.kp, new Message((byte[]) next.message()), next.offset());
                            } catch (InterruptedException e) {
                                HighlevelKafkaConsumer.logger.error("Message Enqueue has been interrupted", e);
                            }
                        }
                        HighlevelKafkaConsumer.logger.debug("Thread {} stops consuming message...", Thread.currentThread().getName());
                    }
                });
            }
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.standardConsumer != null && this.standardConsumer.values() != null) {
            Iterator<ConsumerConnector> it = this.standardConsumer.values().iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
        }
        if (this.consumerThreadExecutor != null) {
            this.consumerThreadExecutor.shutdown();
        }
    }

    public void setConsumerConfig(Properties properties) {
        this.consumerConfig = properties;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public void resetPartitionsAndOffset(Set<KafkaPartition> set, Map<KafkaPartition, Long> map) {
        this.numStream = new HashMap();
        for (KafkaPartition kafkaPartition : set) {
            if (this.numStream.get(kafkaPartition.getClusterId()) == null) {
                this.numStream.put(kafkaPartition.getClusterId(), 0);
            }
            this.numStream.put(kafkaPartition.getClusterId(), Integer.valueOf(this.numStream.get(kafkaPartition.getClusterId()).intValue() + 1));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public void commitOffset() {
        if (this.standardConsumer == null || this.standardConsumer.values() == null) {
            return;
        }
        Iterator<ConsumerConnector> it = this.standardConsumer.values().iterator();
        while (it.hasNext()) {
            it.next().commitOffsets();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.datatorrent.contrib.kafka.KafkaConsumer
    public Map<KafkaPartition, Long> getCurrentOffsets() {
        throw new UnsupportedOperationException("Offset request is currently not supported for high-level consumer");
    }
}
