package org.apache.kylin.stream.source.kafka.consumer;

import com.google.common.collect.Lists;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kylin.stream.core.consumer.ConsumerStartMode;
import org.apache.kylin.stream.core.consumer.IStreamingConnector;
import org.apache.kylin.stream.core.model.StreamingMessage;
import org.apache.kylin.stream.core.source.IStreamingMessageParser;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
import org.apache.kylin.stream.source.kafka.KafkaSource;

/* loaded from: input_file:WEB-INF/lib/kylin-stream-source-kafka-3.0.0.jar:org/apache/kylin/stream/source/kafka/consumer/KafkaConnector.class */
public class KafkaConnector implements IStreamingConnector {
    private final KafkaConsumer<byte[], byte[]> kafkaConsumer;
    private final String topic;
    private final IStreamingMessageParser parser;
    private ConsumerStartMode startMode = ConsumerStartMode.EARLIEST;
    private List<ConsumerRecord<byte[], byte[]>> buffer = Lists.newLinkedList();
    private List<Partition> partitions;
    private Map<Integer, Long> partitionOffsets;
    private KafkaSource kafkaSource;

    public KafkaConnector(Map<String, Object> map, String str, IStreamingMessageParser iStreamingMessageParser, KafkaSource kafkaSource) {
        this.kafkaConsumer = new KafkaConsumer<>(map);
        this.topic = str;
        this.parser = iStreamingMessageParser;
        this.kafkaSource = kafkaSource;
    }

    public void setStartPartition(List<Partition> list, ConsumerStartMode consumerStartMode, Map<Integer, Long> map) {
        this.partitions = list;
        this.startMode = consumerStartMode;
        this.partitionOffsets = map;
    }

    @Override // org.apache.kylin.stream.core.consumer.IStreamingConnector
    public List<Partition> getConsumePartitions() {
        return this.partitions;
    }

    @Override // org.apache.kylin.stream.core.consumer.IStreamingConnector
    public void open() {
        if (this.partitions == null || this.partitions.size() <= 0) {
            throw new IllegalStateException("not assign partitions");
        }
        ArrayList<TopicPartition> newArrayList = Lists.newArrayList();
        Iterator<Partition> it = this.partitions.iterator();
        while (it.hasNext()) {
            newArrayList.add(new TopicPartition(this.topic, it.next().getPartitionId()));
        }
        this.kafkaConsumer.assign(newArrayList);
        if (this.startMode == ConsumerStartMode.EARLIEST) {
            this.kafkaConsumer.seekToBeginning(newArrayList);
            return;
        }
        if (this.startMode == ConsumerStartMode.LATEST) {
            this.kafkaConsumer.seekToEnd(newArrayList);
            return;
        }
        for (TopicPartition topicPartition : newArrayList) {
            this.kafkaConsumer.seek(topicPartition, this.partitionOffsets.get(Integer.valueOf(topicPartition.partition())).longValue());
        }
    }

    @Override // org.apache.kylin.stream.core.consumer.IStreamingConnector
    public void close() {
        this.kafkaConsumer.close();
    }

    @Override // org.apache.kylin.stream.core.consumer.IStreamingConnector
    public void wakeup() {
        this.kafkaConsumer.wakeup();
    }

    @Override // org.apache.kylin.stream.core.consumer.IStreamingConnector
    public StreamingMessage nextEvent() {
        if (this.buffer.isEmpty()) {
            fillBuffer();
        }
        if (this.buffer.isEmpty()) {
            return null;
        }
        return this.parser.parse(this.buffer.remove(0));
    }

    private void fillBuffer() {
        ConsumerRecords<byte[], byte[]> poll = this.kafkaConsumer.poll(100L);
        LinkedList newLinkedList = Lists.newLinkedList();
        Iterator<TopicPartition> it = poll.partitions().iterator();
        while (it.hasNext()) {
            newLinkedList.addAll(poll.records(it.next()));
        }
        this.buffer = newLinkedList;
    }

    @Override // org.apache.kylin.stream.core.consumer.IStreamingConnector
    public IStreamingSource getSource() {
        return this.kafkaSource;
    }
}
