package org.apache.samoa.streams.kafka;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:org/apache/samoa/streams/kafka/KafkaConsumerThread.class */
class KafkaConsumerThread extends Thread {
    private transient KafkaConsumer<String, byte[]> consumer;
    private final Properties consumerProperties;
    private final Collection<String> topics;
    private final long consumerTimeout;
    private Logger log = Logger.getLogger(KafkaConsumerThread.class.getName());
    private boolean running = false;
    private final List<byte[]> buffer = new ArrayList();
    private final Object lock = new Object();

    /* JADX INFO: Access modifiers changed from: package-private */
    public KafkaConsumerThread(Properties properties, Collection<String> collection, long j) {
        this.consumerProperties = properties;
        this.topics = collection;
        this.consumerTimeout = j;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        initializeConsumer();
        while (this.running) {
            fetchDataFromKafka();
        }
        cleanUp();
    }

    private void fetchDataFromKafka() {
        if (this.consumer == null || this.consumer.subscription().isEmpty()) {
            return;
        }
        try {
            fillBufferAndNotifyWaits(getMessagesBytes(this.consumer.poll(this.consumerTimeout)));
        } catch (Throwable th) {
            Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, (String) null, th);
        }
    }

    private void fillBufferAndNotifyWaits(List<byte[]> list) {
        synchronized (this.lock) {
            this.buffer.addAll(list);
            if (this.buffer.size() > 0) {
                this.lock.notifyAll();
            }
        }
    }

    private void cleanUp() {
        if (this.consumer != null) {
            this.consumer.unsubscribe();
            this.consumer.close();
        }
    }

    private void initializeConsumer() {
        this.log.log(Level.INFO, "Instantiating Kafka consumer");
        if (this.consumer == null) {
            this.consumer = new KafkaConsumer<>(this.consumerProperties);
            this.running = true;
        }
        this.consumer.subscribe(this.topics);
    }

    private List<byte[]> getMessagesBytes(ConsumerRecords<String, byte[]> consumerRecords) {
        Iterator it = consumerRecords.iterator();
        ArrayList arrayList = new ArrayList();
        while (it.hasNext()) {
            arrayList.add(((ConsumerRecord) it.next()).value());
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.running = false;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<byte[]> getKafkaMessages() {
        ArrayList arrayList;
        synchronized (this.lock) {
            if (this.buffer.isEmpty()) {
                try {
                    this.lock.wait();
                } catch (InterruptedException e) {
                    Logger.getLogger(KafkaConsumerThread.class.getName()).log(Level.SEVERE, (String) null, (Throwable) e);
                }
            }
            arrayList = new ArrayList();
            arrayList.addAll(this.buffer);
            this.buffer.clear();
        }
        return arrayList;
    }
}
