package org.apache.pinot.plugin.stream.kafka20;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.class */
public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler implements PartitionLevelConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaPartitionLevelConsumer.class);

    public KafkaPartitionLevelConsumer(String str, StreamConfig streamConfig, int i) {
        super(str, streamConfig, i);
    }

    public MessageBatch<byte[]> fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, StreamPartitionMsgOffset streamPartitionMsgOffset2, int i) {
        return fetchMessages(((LongMsgOffset) streamPartitionMsgOffset).getOffset(), streamPartitionMsgOffset2 == null ? Long.MAX_VALUE : ((LongMsgOffset) streamPartitionMsgOffset2).getOffset(), i);
    }

    public MessageBatch<byte[]> fetchMessages(long j, long j2, int i) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("poll consumer: {}, startOffset: {}, endOffset:{} timeout: {}ms", new Object[]{this._topicPartition, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i)});
        }
        this._consumer.seek(this._topicPartition, j);
        List<ConsumerRecord<?, ?>> records = this._consumer.poll(Duration.ofMillis(i)).records(this._topicPartition);
        ArrayList arrayList = new ArrayList(records.size());
        long j3 = j;
        for (ConsumerRecord<?, ?> consumerRecord : records) {
            Bytes bytes = (Bytes) consumerRecord.value();
            long offset = consumerRecord.offset();
            if ((offset >= j) && ((j2 > offset) || (j2 == -1))) {
                if (bytes != null) {
                    arrayList.add(new MessageAndOffsetAndMetadata(bytes.get(), offset, this._rowMetadataExtractor.extract(consumerRecord)));
                } else if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("tombstone message at offset {}", Long.valueOf(offset));
                }
                j3 = offset;
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("filter message at offset {} (outside of offset range {} {})", new Object[]{Long.valueOf(offset), Long.valueOf(j), Long.valueOf(j2)});
            }
        }
        return new KafkaMessageBatch(records.size(), j3, arrayList);
    }
}
