package org.apache.crunch.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Properties;
import java.util.Set;
import org.apache.crunch.Pair;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/crunch/kafka/KafkaRecordsIterable.class */
public class KafkaRecordsIterable<K, V> implements Iterable<Pair<K, V>> {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsIterable.class);
    private final Consumer<K, V> consumer;
    private final Map<TopicPartition, Pair<Long, Long>> offsets;
    private final boolean isEmpty;
    private final long scanPollTime;
    private final int maxRetryAttempts;

    /* loaded from: input_file:org/apache/crunch/kafka/KafkaRecordsIterable$RecordsIterator.class */
    private static class RecordsIterator<K, V> implements Iterator<Pair<K, V>> {
        private final Consumer<K, V> consumer;
        private final Map<TopicPartition, Pair<Long, Long>> offsets;
        private final long pollTime;
        private final int maxNumAttempts;
        private ConsumerRecords<K, V> records;
        private Iterator<ConsumerRecord<K, V>> currentIterator;
        private final Set<TopicPartition> remainingPartitions;
        private Pair<K, V> next;

        public RecordsIterator(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> map, long j, int i) {
            this.consumer = consumer;
            this.remainingPartitions = new HashSet(map.keySet());
            this.offsets = map;
            this.pollTime = j;
            this.maxNumAttempts = i;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            if (this.next != null) {
                return true;
            }
            if (this.remainingPartitions.size() > 0) {
                this.next = getNext();
            }
            return this.next != null;
        }

        @Override // java.util.Iterator
        public Pair<K, V> next() {
            if (this.next == null) {
                this.next = getNext();
            }
            if (this.next == null) {
                throw new NoSuchElementException("No more elements.");
            }
            Pair<K, V> pair = this.next;
            this.next = getNext();
            return pair;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("remove is not supported.");
        }

        private Iterator<ConsumerRecord<K, V>> getIterator() {
            if (this.remainingPartitions.isEmpty()) {
                KafkaRecordsIterable.LOG.debug("No more partitions to consume therefore not retrieving any more records.");
                return null;
            }
            if (this.currentIterator != null && this.currentIterator.hasNext()) {
                return this.currentIterator;
            }
            KafkaRecordsIterable.LOG.debug("Retrieving next set of records.");
            int i = 0;
            boolean z = false;
            while (!z && i < this.maxNumAttempts) {
                try {
                    this.records = this.consumer.poll(this.pollTime);
                    z = true;
                } catch (RetriableException e) {
                    i++;
                    if (i >= this.maxNumAttempts) {
                        KafkaRecordsIterable.LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", Integer.valueOf(this.maxNumAttempts), e);
                        throw e;
                    }
                    KafkaRecordsIterable.LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", Integer.valueOf(i), e);
                }
            }
            if (this.records != null && !this.records.isEmpty()) {
                this.currentIterator = this.records.iterator();
                return this.currentIterator;
            }
            KafkaRecordsIterable.LOG.debug("Retrieved empty records.");
            this.currentIterator = null;
            return null;
        }

        private Pair<K, V> getNext() {
            while (!this.remainingPartitions.isEmpty()) {
                Iterator<ConsumerRecord<K, V>> iterator = getIterator();
                while (iterator != null && iterator.hasNext()) {
                    ConsumerRecord<K, V> next = iterator.next();
                    TopicPartition topicPartition = new TopicPartition(next.topic(), next.partition());
                    long offset = next.offset();
                    if (withinRange(topicPartition, offset)) {
                        KafkaRecordsIterable.LOG.debug("Retrieving value for {} with offset {}.", topicPartition, Long.valueOf(offset));
                        return Pair.of(next.key(), next.value());
                    }
                    KafkaRecordsIterable.LOG.debug("Value for {} with offset {} is outside of range skipping.", topicPartition, Long.valueOf(offset));
                }
            }
            KafkaRecordsIterable.LOG.debug("Closing the consumer because there are no more remaining partitions.");
            this.consumer.close();
            KafkaRecordsIterable.LOG.debug("Consumed data from all partitions.");
            return null;
        }

        private boolean withinRange(TopicPartition topicPartition, long j) {
            long longValue = ((Long) this.offsets.get(topicPartition).second()).longValue();
            boolean z = j < longValue;
            if (j >= longValue - 1) {
                if (KafkaRecordsIterable.LOG.isDebugEnabled()) {
                    KafkaRecordsIterable.LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.", new Object[]{topicPartition, Long.valueOf(j), Long.valueOf(longValue)});
                }
                this.remainingPartitions.remove(topicPartition);
                this.consumer.pause(Collections.singleton(topicPartition));
            }
            KafkaRecordsIterable.LOG.debug("Value for partition {} and offset {} is within range.", topicPartition, Long.valueOf(j));
            return z;
        }
    }

    public KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> map, Properties properties) {
        if (consumer == null) {
            throw new IllegalArgumentException("The 'consumer' cannot be 'null'.");
        }
        this.consumer = consumer;
        if (properties == null) {
            throw new IllegalArgumentException("The 'properties' cannot be 'null'.");
        }
        this.maxRetryAttempts = Integer.parseInt(properties.getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY, KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING));
        if (map == null || map.isEmpty()) {
            throw new IllegalArgumentException("The 'offsets' cannot 'null' or empty.");
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : map.entrySet()) {
            Pair<Long, Long> value = entry.getValue();
            if (((Long) value.first()).longValue() < ((Long) value.second()).longValue()) {
                hashMap.put(entry.getKey(), value);
            } else {
                LOG.debug("Removing offsets for {} because start is not less than the end offset.", entry.getKey());
            }
        }
        this.isEmpty = hashMap.isEmpty();
        if (this.isEmpty) {
            LOG.warn("Iterable for Kafka for is empty because offsets are empty.");
        }
        this.offsets = hashMap;
        this.scanPollTime = Long.parseLong(properties.getProperty("org.apache.crunch.kafka.consumer.poll.timeout", Long.toString(1000L)));
    }

    @Override // java.lang.Iterable
    public Iterator<Pair<K, V>> iterator() {
        if (this.isEmpty) {
            LOG.debug("Returning empty iterator since offsets align.");
            return Collections.emptyIterator();
        }
        LOG.debug("Assigning topics and partitions and seeking to start offsets.");
        this.consumer.assign(new LinkedList(this.offsets.keySet()));
        this.consumer.poll(0L);
        for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : this.offsets.entrySet()) {
            this.consumer.seek(entry.getKey(), ((Long) entry.getValue().first()).longValue());
        }
        return new RecordsIterator(this.consumer, this.offsets, this.scanPollTime, this.maxRetryAttempts);
    }
}
