package org.apache.crunch.kafka.record;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.kafka.KafkaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/crunch/kafka/record/KafkaRecordReader.class */
public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
    private Consumer<K, V> consumer;
    private ConsumerRecord<K, V> record;
    private long endingOffset;
    private Iterator<ConsumerRecord<K, V>> recordIterator;
    private long consumerPollTimeout;
    private long maxNumberOfRecords;
    private long startingOffset;
    private long currentOffset;
    private int maxNumberAttempts;
    private Properties kafkaConnectionProperties;
    private TopicPartition topicPartition;

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        if (!(inputSplit instanceof KafkaInputSplit)) {
            throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
        }
        this.kafkaConnectionProperties = KafkaInputFormat.filterConnectionProperties(KafkaUtils.getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
        this.consumer = buildConsumer(this.kafkaConnectionProperties);
        KafkaInputSplit kafkaInputSplit = (KafkaInputSplit) inputSplit;
        this.topicPartition = kafkaInputSplit.getTopicPartition();
        this.consumer.assign(Collections.singletonList(this.topicPartition));
        this.consumer.poll(0L);
        this.startingOffset = kafkaInputSplit.getStartingOffset();
        this.consumer.seek(this.topicPartition, this.startingOffset);
        this.currentOffset = this.startingOffset - 1;
        this.endingOffset = kafkaInputSplit.getEndingOffset();
        this.maxNumberOfRecords = this.endingOffset - this.startingOffset;
        if (LOG.isInfoEnabled()) {
            LOG.info("Reading data from {} between {} and {}", new Object[]{this.topicPartition, Long.valueOf(this.startingOffset), Long.valueOf(this.endingOffset)});
        }
        Configuration configuration = taskAttemptContext.getConfiguration();
        this.consumerPollTimeout = configuration.getLong("org.apache.crunch.kafka.consumer.poll.timeout", 1000L);
        this.maxNumberAttempts = configuration.getInt(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY, KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT);
    }

    protected KafkaConsumer<K, V> buildConsumer(Properties properties) {
        return new KafkaConsumer<>(properties);
    }

    protected long getCurrentOffset() {
        return this.currentOffset;
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (hasPendingData()) {
            loadRecords();
            this.record = this.recordIterator.hasNext() ? this.recordIterator.next() : null;
            if (this.record != null) {
                LOG.debug("nextKeyValue: Retrieved record with offset {}", Long.valueOf(this.record.offset()));
                long j = this.currentOffset;
                this.currentOffset = this.record.offset();
                LOG.debug("Current offset will be updated to be [{}]", Long.valueOf(this.currentOffset));
                if (LOG.isWarnEnabled() && this.currentOffset - j > 1) {
                    LOG.warn("Possible data loss in partition {} as offset {} was larger than expected {}", new Object[]{this.topicPartition, Long.valueOf(this.currentOffset), Long.valueOf(j + 1)});
                }
                if (this.currentOffset < this.endingOffset) {
                    return true;
                }
                if (LOG.isWarnEnabled()) {
                    LOG.warn("Record offset {} is beyond our end offset {}. This could indicate data loss in partition {}", new Object[]{Long.valueOf(this.currentOffset), Long.valueOf(this.endingOffset), this.topicPartition});
                }
                this.record = null;
                return false;
            }
            if (!isPartitionEmpty()) {
                throw new IOException("Unable to read additional data from Kafka. See logs for details. Partition " + this.topicPartition + " Current Offset: " + this.currentOffset + " End Offset: " + this.endingOffset);
            }
            if (LOG.isWarnEnabled()) {
                LOG.warn("The partition {} is empty though we expected to read from {} to {}. This could indicate data loss", new Object[]{this.topicPartition, Long.valueOf(this.currentOffset), Long.valueOf(this.endingOffset)});
            }
        }
        this.record = null;
        return false;
    }

    private boolean isPartitionEmpty() {
        return getEarliestOffset() == this.endingOffset;
    }

    /* renamed from: getCurrentKey, reason: merged with bridge method [inline-methods] */
    public ConsumerRecord<K, V> m12getCurrentKey() throws IOException, InterruptedException {
        return this.record;
    }

    /* renamed from: getCurrentValue, reason: merged with bridge method [inline-methods] */
    public Void m11getCurrentValue() throws IOException, InterruptedException {
        return null;
    }

    public float getProgress() throws IOException, InterruptedException {
        return ((float) ((this.currentOffset - this.startingOffset) + 1)) / ((float) this.maxNumberOfRecords);
    }

    private boolean hasPendingData() {
        return this.currentOffset < this.endingOffset - 1;
    }

    protected Iterator<ConsumerRecord<K, V>> getRecordIterator() {
        return this.recordIterator;
    }

    protected void loadRecords() {
        if (this.recordIterator == null || !this.recordIterator.hasNext()) {
            ConsumerRecords consumerRecords = null;
            int i = 0;
            boolean z = false;
            while (!z && i < this.maxNumberAttempts) {
                i++;
                try {
                    consumerRecords = getConsumer().poll(this.consumerPollTimeout);
                } catch (RetriableException e) {
                    LOG.warn("Error pulling messages from Kafka", e);
                }
                if (consumerRecords == null || !consumerRecords.isEmpty()) {
                    if (consumerRecords != null && !consumerRecords.isEmpty()) {
                        z = true;
                    }
                } else if (LOG.isWarnEnabled()) {
                    LOG.warn("No records retrieved from partition {} with poll timeout {} but pending offsets to consume. Current Offset: {}, End Offset: {}", new Object[]{this.topicPartition, Long.valueOf(this.consumerPollTimeout), Long.valueOf(this.currentOffset), Long.valueOf(this.endingOffset)});
                }
                if (!z && i < this.maxNumberAttempts) {
                    LOG.info("Record fetch attempt {} / {} failed, retrying", Integer.valueOf(i), Integer.valueOf(this.maxNumberAttempts));
                } else if (!z && LOG.isWarnEnabled()) {
                    LOG.warn("Record fetch attempt {} / {} failed. No more attempts left for partition {}", new Object[]{Integer.valueOf(i), Integer.valueOf(this.maxNumberAttempts), this.topicPartition});
                }
            }
            if (consumerRecords == null || consumerRecords.isEmpty()) {
                LOG.info("No records retrieved from Kafka partition {} therefore nothing to iterate over", this.topicPartition);
            } else {
                LOG.info("Retrieved {} records from Kafka partition {} to iterate over starting from offset {}", new Object[]{Integer.valueOf(consumerRecords.count()), this.topicPartition, Long.valueOf(((ConsumerRecord) consumerRecords.iterator().next()).offset())});
            }
            this.recordIterator = consumerRecords != null ? consumerRecords.iterator() : ConsumerRecords.empty().iterator();
        }
    }

    protected Consumer<K, V> getConsumer() {
        return this.consumer;
    }

    protected long getEarliestOffset() {
        Long l = (Long) this.consumer.beginningOffsets(Collections.singletonList(this.topicPartition)).get(this.topicPartition);
        if (l == null) {
            LOG.debug("Unable to determine earliest offset for {} so returning 0", this.topicPartition);
            return 0L;
        }
        LOG.debug("Earliest offset for {} is {}", this.topicPartition, l);
        return l.longValue();
    }

    public void close() throws IOException {
        LOG.debug("Closing the record reader.");
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}
