package org.apache.pinot.plugin.stream.kafka20;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.shaded.org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.pinot.shaded.org.apache.kafka.common.utils.Bytes;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.MessageBatch;
import org.apache.pinot.spi.stream.PartitionLevelConsumer;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamMessage;
import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pinot/plugin/stream/kafka20/KafkaPartitionLevelConsumer.class */
public class KafkaPartitionLevelConsumer extends KafkaPartitionLevelConnectionHandler implements PartitionLevelConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger((Class<?>) KafkaPartitionLevelConsumer.class);
    private long _lastFetchedOffset;

    public KafkaPartitionLevelConsumer(String str, StreamConfig streamConfig, int i) {
        super(str, streamConfig, i);
        this._lastFetchedOffset = -1L;
    }

    public MessageBatch<StreamMessage<byte[]>> fetchMessages(StreamPartitionMsgOffset streamPartitionMsgOffset, StreamPartitionMsgOffset streamPartitionMsgOffset2, int i) {
        return fetchMessages(((LongMsgOffset) streamPartitionMsgOffset).getOffset(), streamPartitionMsgOffset2 == null ? Long.MAX_VALUE : ((LongMsgOffset) streamPartitionMsgOffset2).getOffset(), i);
    }

    public synchronized MessageBatch<StreamMessage<byte[]>> fetchMessages(long j, long j2, int i) {
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Polling partition: {}, startOffset: {}, endOffset: {} timeout: {}ms", this._topicPartition, Long.valueOf(j), Long.valueOf(j2), Integer.valueOf(i));
        }
        if (this._lastFetchedOffset < 0 || this._lastFetchedOffset != j - 1) {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Seeking to offset: {}", Long.valueOf(j));
            }
            this._consumer.seek(this._topicPartition, j);
        }
        List<ConsumerRecord<String, Bytes>> records = this._consumer.poll(Duration.ofMillis(i)).records(this._topicPartition);
        ArrayList arrayList = new ArrayList(records.size());
        long j3 = j;
        StreamMessageMetadata streamMessageMetadata = null;
        for (ConsumerRecord<String, Bytes> consumerRecord : records) {
            long offset = consumerRecord.offset();
            this._lastFetchedOffset = offset;
            if (offset >= j && (j2 > offset || j2 < 0)) {
                Bytes value = consumerRecord.value();
                streamMessageMetadata = (StreamMessageMetadata) this._kafkaMetadataExtractor.extract(consumerRecord);
                if (value != null) {
                    String key = consumerRecord.key();
                    arrayList.add(new KafkaStreamMessage(key != null ? key.getBytes(StandardCharsets.UTF_8) : null, value.get(), streamMessageMetadata));
                } else if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("Tombstone message at offset: {}", Long.valueOf(offset));
                }
                j3 = offset;
            } else if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("Ignoring message at offset: {} (outside of offset range [{}, {}))", Long.valueOf(offset), Long.valueOf(j), Long.valueOf(j2));
            }
        }
        return new KafkaMessageBatch(records.size(), j3, arrayList, streamMessageMetadata);
    }
}
