package org.apache.samoa.streams.kafka;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.samoa.core.ContentEvent;
import org.apache.samoa.core.EntranceProcessor;
import org.apache.samoa.core.Processor;

/* loaded from: input_file:org/apache/samoa/streams/kafka/KafkaEntranceProcessor.class */
public class KafkaEntranceProcessor implements EntranceProcessor {
    private final transient KafkaUtils kafkaUtils;
    private List<byte[]> buffer;
    private final KafkaDeserializer deserializer;
    private final String topic;

    public KafkaEntranceProcessor(Properties properties, String str, int i, KafkaDeserializer kafkaDeserializer) {
        this.kafkaUtils = new KafkaUtils(properties, null, i);
        this.deserializer = kafkaDeserializer;
        this.topic = str;
    }

    private KafkaEntranceProcessor(KafkaUtils kafkaUtils, KafkaDeserializer kafkaDeserializer, String str) {
        this.kafkaUtils = kafkaUtils;
        this.deserializer = kafkaDeserializer;
        this.topic = str;
    }

    @Override // org.apache.samoa.core.EntranceProcessor, org.apache.samoa.core.Processor
    public void onCreate(int i) {
        this.buffer = new ArrayList(100);
        this.kafkaUtils.initializeConsumer(Arrays.asList(this.topic));
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public boolean isFinished() {
        return false;
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public boolean hasNext() {
        if (this.buffer.isEmpty()) {
            try {
                this.buffer.addAll(this.kafkaUtils.getKafkaMessages());
            } catch (Exception e) {
                Logger.getLogger(KafkaEntranceProcessor.class.getName()).log(Level.SEVERE, (String) null, (Throwable) e);
            }
        }
        return this.buffer.size() > 0;
    }

    @Override // org.apache.samoa.core.EntranceProcessor
    public ContentEvent nextEvent() {
        return this.deserializer.deserialize(this.buffer.remove(0));
    }

    @Override // org.apache.samoa.core.Processor
    public boolean process(ContentEvent contentEvent) {
        return false;
    }

    @Override // org.apache.samoa.core.Processor
    public Processor newProcessor(Processor processor) {
        KafkaEntranceProcessor kafkaEntranceProcessor = (KafkaEntranceProcessor) processor;
        return new KafkaEntranceProcessor(new KafkaUtils(kafkaEntranceProcessor.kafkaUtils), kafkaEntranceProcessor.deserializer, kafkaEntranceProcessor.topic);
    }

    protected void finalize() throws Throwable {
        this.kafkaUtils.closeConsumer();
        super.finalize();
    }
}
