package org.apache.crunch.kafka.inputformat;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import kafka.api.OffsetRequest;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.kafka.KafkaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/crunch/kafka/inputformat/KafkaRecordReader.class */
public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
    private Consumer<K, V> consumer;
    private ConsumerRecord<K, V> record;
    private long endingOffset;
    private Iterator<ConsumerRecord<K, V>> recordIterator;
    private long consumerPollTimeout;
    private long maxNumberOfRecords;
    private long startingOffset;
    private long currentOffset;
    private int maxNumberAttempts;
    private Properties connectionProperties;
    private TopicPartition topicPartition;
    private int concurrentEmptyResponses;
    private int maxConcurrentEmptyResponses;

    public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        if (!(inputSplit instanceof KafkaInputSplit)) {
            throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
        }
        this.consumer = new KafkaConsumer(KafkaInputFormat.filterConnectionProperties(KafkaUtils.getKafkaConnectionProperties(taskAttemptContext.getConfiguration())));
        KafkaInputSplit kafkaInputSplit = (KafkaInputSplit) inputSplit;
        TopicPartition topicPartition = kafkaInputSplit.getTopicPartition();
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.poll(0L);
        this.startingOffset = kafkaInputSplit.getStartingOffset();
        this.consumer.seek(topicPartition, this.startingOffset);
        this.currentOffset = this.startingOffset - 1;
        this.endingOffset = kafkaInputSplit.getEndingOffset();
        this.maxNumberOfRecords = this.endingOffset - this.startingOffset;
        if (LOG.isInfoEnabled()) {
            LOG.info("Reading data from {} between {} and {}", new Object[]{topicPartition, Long.valueOf(this.startingOffset), Long.valueOf(this.endingOffset)});
        }
        Configuration configuration = taskAttemptContext.getConfiguration();
        this.consumerPollTimeout = configuration.getLong("org.apache.crunch.kafka.consumer.poll.timeout", 1000L);
        this.maxNumberAttempts = configuration.getInt(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY, KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT);
        this.maxConcurrentEmptyResponses = configuration.getInt(KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, 10);
        this.concurrentEmptyResponses = 0;
    }

    public boolean nextKeyValue() throws IOException, InterruptedException {
        if (hasPendingData()) {
            this.recordIterator = getRecords();
            this.record = this.recordIterator.hasNext() ? this.recordIterator.next() : null;
            if (this.record != null) {
                LOG.debug("nextKeyValue: Retrieved record with offset {}", Long.valueOf(this.record.offset()));
                long j = this.currentOffset;
                this.currentOffset = this.record.offset();
                LOG.debug("Current offset will be updated to be [{}]", Long.valueOf(this.currentOffset));
                if (!LOG.isWarnEnabled() || this.currentOffset - j <= 1) {
                    return true;
                }
                LOG.warn("Offset increment was larger than expected value of one, old {} new {}", Long.valueOf(j), Long.valueOf(this.currentOffset));
                return true;
            }
            LOG.warn("nextKeyValue: Retrieved null record last offset was {} and ending offset is {}", Long.valueOf(this.currentOffset), Long.valueOf(this.endingOffset));
        }
        this.record = null;
        return false;
    }

    public K getCurrentKey() throws IOException, InterruptedException {
        if (this.record == null) {
            return null;
        }
        return (K) this.record.key();
    }

    public V getCurrentValue() throws IOException, InterruptedException {
        if (this.record == null) {
            return null;
        }
        return (V) this.record.value();
    }

    public float getProgress() throws IOException, InterruptedException {
        return ((float) ((this.currentOffset - this.startingOffset) + 1)) / ((float) this.maxNumberOfRecords);
    }

    private boolean hasPendingData() {
        boolean z = this.currentOffset < this.endingOffset - 1;
        if (this.concurrentEmptyResponses > this.maxConcurrentEmptyResponses) {
            long earliestOffset = getEarliestOffset();
            if (earliestOffset == this.endingOffset) {
                LOG.warn("Possible data loss for {} as earliest {} is equal to {} and greater than expected current {}.", new Object[]{this.topicPartition, Long.valueOf(earliestOffset), Long.valueOf(this.endingOffset), Long.valueOf(this.currentOffset)});
                return false;
            }
        }
        return z;
    }

    private Iterator<ConsumerRecord<K, V>> getRecords() {
        if (this.recordIterator != null && this.recordIterator.hasNext()) {
            return this.recordIterator;
        }
        ConsumerRecords consumerRecords = null;
        int i = 0;
        boolean z = false;
        while (!z && i < this.maxNumberAttempts) {
            try {
                consumerRecords = getConsumer().poll(this.consumerPollTimeout);
            } catch (RetriableException e) {
                i++;
                if (i >= this.maxNumberAttempts) {
                    LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", Integer.valueOf(this.maxNumberAttempts), e);
                    throw e;
                }
                LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", Integer.valueOf(i + 1), e);
            }
            if ((consumerRecords == null || consumerRecords.isEmpty()) && hasPendingData()) {
                this.concurrentEmptyResponses++;
                LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}", Integer.valueOf(this.concurrentEmptyResponses), Integer.valueOf(this.maxConcurrentEmptyResponses));
            } else {
                z = true;
            }
        }
        this.concurrentEmptyResponses = 0;
        if (consumerRecords == null || consumerRecords.isEmpty()) {
            LOG.info("No records retrieved from Kafka therefore nothing to iterate over.");
        } else {
            LOG.info("Retrieved records from Kafka to iterate over.");
        }
        return consumerRecords != null ? consumerRecords.iterator() : ConsumerRecords.empty().iterator();
    }

    protected Consumer<K, V> getConsumer() {
        return this.consumer;
    }

    protected long getEarliestOffset() {
        Long l = KafkaUtils.getBrokerOffsets(this.connectionProperties, OffsetRequest.EarliestTime(), this.topicPartition.topic()).get(this.topicPartition);
        if (l == null) {
            LOG.debug("Unable to determine earliest offset for {} so returning earliest {}", this.topicPartition, Long.valueOf(OffsetRequest.EarliestTime()));
            return OffsetRequest.EarliestTime();
        }
        LOG.debug("Earliest offset for {} is {}", this.topicPartition, l);
        return l.longValue();
    }

    public void close() throws IOException {
        LOG.debug("Closing the record reader.");
        if (this.consumer != null) {
            this.consumer.close();
        }
    }
}
