package com.datatorrent.contrib.kafka;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CountDownLatch;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/contrib/kafka/KafkaTestConsumer.class */
public class KafkaTestConsumer implements Runnable {
    private static final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
    private final transient ConsumerConnector consumer;
    protected static final int BUFFER_SIZE_DEFAULT = 1048576;
    private final int bufferSize = 1048576;
    public transient ArrayBlockingQueue<Message> holdingBuffer;
    private final String topic;
    private String zkaddress;
    private boolean isAlive;
    private int receiveCount;
    private CountDownLatch latch;

    public int getReceiveCount() {
        return this.receiveCount;
    }

    public void setReceiveCount(int i) {
        this.receiveCount = i;
    }

    public void setIsAlive(boolean z) {
        this.isAlive = z;
    }

    public KafkaTestConsumer(String str) {
        this.bufferSize = BUFFER_SIZE_DEFAULT;
        this.holdingBuffer = new ArrayBlockingQueue<>(BUFFER_SIZE_DEFAULT);
        this.zkaddress = "localhost:2182";
        this.isAlive = true;
        this.receiveCount = 0;
        this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
        this.topic = str;
    }

    public KafkaTestConsumer(String str, String str2) {
        this.bufferSize = BUFFER_SIZE_DEFAULT;
        this.holdingBuffer = new ArrayBlockingQueue<>(BUFFER_SIZE_DEFAULT);
        this.zkaddress = "localhost:2182";
        this.isAlive = true;
        this.receiveCount = 0;
        this.zkaddress = str2;
        this.topic = str;
        this.consumer = Consumer.createJavaConsumerConnector(createConsumerConfig());
    }

    private ConsumerConfig createConsumerConfig() {
        Properties properties = new Properties();
        properties.setProperty("zookeeper.connect", this.zkaddress);
        properties.setProperty("group.id", "group1");
        properties.put("auto.offset.reset", "smallest");
        return new ConsumerConfig(properties);
    }

    public String getMessage(Message message) {
        ByteBuffer payload = message.payload();
        byte[] bArr = new byte[payload.remaining()];
        payload.get(bArr);
        return new String(bArr);
    }

    @Override // java.lang.Runnable
    public void run() {
        HashMap hashMap = new HashMap();
        hashMap.put(this.topic, new Integer(1));
        ConsumerIterator it = ((KafkaStream) ((List) this.consumer.createMessageStreams(hashMap).get(this.topic)).get(0)).iterator();
        logger.debug("Inside consumer::run receiveCount= {}", Integer.valueOf(this.receiveCount));
        while (it.hasNext() & this.isAlive) {
            Message message = new Message((byte[]) it.next().message());
            if (this.latch != null) {
                this.latch.countDown();
            }
            if (getMessage(message).equals("END_TUPLE")) {
                break;
            }
            this.holdingBuffer.add(message);
            this.receiveCount++;
            logger.debug("Consuming {}, receiveCount= {}", getMessage(message), Integer.valueOf(this.receiveCount));
            try {
                Thread.sleep(50L);
            } catch (InterruptedException e) {
            }
        }
        logger.debug("DONE consuming");
    }

    public void close() {
        this.holdingBuffer.clear();
        this.consumer.shutdown();
    }

    public void setLatch(CountDownLatch countDownLatch) {
        this.latch = countDownLatch;
    }
}
