package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.NoOffsetForPartitionException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher.class */
public class Fetcher<K, V> implements SubscriptionState.Listener {
    private static final Logger log = LoggerFactory.getLogger(Fetcher.class);
    private final ConsumerNetworkClient client;
    private final Time time;
    private final int minBytes;
    private final int maxBytes;
    private final int maxWaitMs;
    private final int fetchSize;
    private final long retryBackoffMs;
    private final int maxPollRecords;
    private final boolean checkCrcs;
    private final Metadata metadata;
    private final FetchManagerMetrics sensors;
    private final SubscriptionState subscriptions;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private PartitionRecords<K, V> nextInLineRecords = null;
    private ExceptionMetadata nextInLineExceptionMetadata = null;
    private final ConcurrentLinkedQueue<CompletedFetch> completedFetches = new ConcurrentLinkedQueue<>();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$CompletedFetch.class */
    public static class CompletedFetch {
        private final TopicPartition partition;
        private final long fetchedOffset;
        private final FetchResponse.PartitionData partitionData;
        private final FetchResponseMetricAggregator metricAggregator;
        private final short responseVersion;

        private CompletedFetch(TopicPartition topicPartition, long j, FetchResponse.PartitionData partitionData, FetchResponseMetricAggregator fetchResponseMetricAggregator, short s) {
            this.partition = topicPartition;
            this.fetchedOffset = j;
            this.partitionData = partitionData;
            this.metricAggregator = fetchResponseMetricAggregator;
            this.responseVersion = s;
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$ExceptionMetadata.class */
    private static class ExceptionMetadata {
        private final TopicPartition partition;
        private final long fetchedOffset;
        private final KafkaException exception;

        private ExceptionMetadata(TopicPartition topicPartition, long j, KafkaException kafkaException) {
            this.partition = topicPartition;
            this.fetchedOffset = j;
            this.exception = kafkaException;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$FetchManagerMetrics.class */
    public static class FetchManagerMetrics {
        private final Metrics metrics;
        private final String metricGrpName;
        private final Sensor bytesFetched;
        private final Sensor recordsFetched;
        private final Sensor fetchLatency;
        private final Sensor recordsFetchLag;
        private final Sensor fetchThrottleTimeSensor;
        private Set<TopicPartition> assignedPartitions;

        private FetchManagerMetrics(Metrics metrics, String str) {
            this.metrics = metrics;
            this.metricGrpName = str + "-fetch-manager-metrics";
            this.bytesFetched = metrics.sensor("bytes-fetched");
            this.bytesFetched.add(metrics.metricName("fetch-size-avg", this.metricGrpName, "The average number of bytes fetched per request"), new Avg());
            this.bytesFetched.add(metrics.metricName("fetch-size-max", this.metricGrpName, "The maximum number of bytes fetched per request"), new Max());
            this.bytesFetched.add(metrics.metricName("bytes-consumed-rate", this.metricGrpName, "The average number of bytes consumed per second"), new Rate());
            this.recordsFetched = metrics.sensor("records-fetched");
            this.recordsFetched.add(metrics.metricName("records-per-request-avg", this.metricGrpName, "The average number of records in each request"), new Avg());
            this.recordsFetched.add(metrics.metricName("records-consumed-rate", this.metricGrpName, "The average number of records consumed per second"), new Rate());
            this.fetchLatency = metrics.sensor("fetch-latency");
            this.fetchLatency.add(metrics.metricName("fetch-latency-avg", this.metricGrpName, "The average time taken for a fetch request."), new Avg());
            this.fetchLatency.add(metrics.metricName("fetch-latency-max", this.metricGrpName, "The max time taken for any fetch request."), new Max());
            this.fetchLatency.add(metrics.metricName("fetch-rate", this.metricGrpName, "The number of fetch requests per second."), new Rate(new Count()));
            this.recordsFetchLag = metrics.sensor("records-lag");
            this.recordsFetchLag.add(metrics.metricName("records-lag-max", this.metricGrpName, "The maximum lag in terms of number of records for any partition in this window"), new Max());
            this.fetchThrottleTimeSensor = metrics.sensor("fetch-throttle-time");
            this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-avg", this.metricGrpName, "The average throttle time in ms"), new Avg());
            this.fetchThrottleTimeSensor.add(metrics.metricName("fetch-throttle-time-max", this.metricGrpName, "The maximum throttle time in ms"), new Max());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordTopicFetchMetrics(String str, int i, int i2) {
            String str2 = "topic." + str + ".bytes-fetched";
            Sensor sensor = this.metrics.getSensor(str2);
            if (sensor == null) {
                Map<String, String> singletonMap = Collections.singletonMap(ConsumerProtocol.TOPIC_KEY_NAME, str.replace('.', '_'));
                sensor = this.metrics.sensor(str2);
                sensor.add(this.metrics.metricName("fetch-size-avg", this.metricGrpName, "The average number of bytes fetched per request for topic " + str, singletonMap), new Avg());
                sensor.add(this.metrics.metricName("fetch-size-max", this.metricGrpName, "The maximum number of bytes fetched per request for topic " + str, singletonMap), new Max());
                sensor.add(this.metrics.metricName("bytes-consumed-rate", this.metricGrpName, "The average number of bytes consumed per second for topic " + str, singletonMap), new Rate());
            }
            sensor.record(i);
            String str3 = "topic." + str + ".records-fetched";
            Sensor sensor2 = this.metrics.getSensor(str3);
            if (sensor2 == null) {
                HashMap hashMap = new HashMap(1);
                hashMap.put(ConsumerProtocol.TOPIC_KEY_NAME, str.replace('.', '_'));
                sensor2 = this.metrics.sensor(str3);
                sensor2.add(this.metrics.metricName("records-per-request-avg", this.metricGrpName, "The average number of records in each request for topic " + str, hashMap), new Avg());
                sensor2.add(this.metrics.metricName("records-consumed-rate", this.metricGrpName, "The average number of records consumed per second for topic " + str, hashMap), new Rate());
            }
            sensor2.record(i2);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updatePartitionLagSensors(Set<TopicPartition> set) {
            if (this.assignedPartitions != null) {
                for (TopicPartition topicPartition : this.assignedPartitions) {
                    if (!set.contains(topicPartition)) {
                        this.metrics.removeSensor(partitionLagMetricName(topicPartition));
                    }
                }
            }
            this.assignedPartitions = set;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordPartitionLag(TopicPartition topicPartition, long j) {
            this.recordsFetchLag.record(j);
            String partitionLagMetricName = partitionLagMetricName(topicPartition);
            Sensor sensor = this.metrics.getSensor(partitionLagMetricName);
            if (sensor == null) {
                sensor = this.metrics.sensor(partitionLagMetricName);
                sensor.add(this.metrics.metricName(partitionLagMetricName, this.metricGrpName, "The latest lag of the partition"), new Value());
                sensor.add(this.metrics.metricName(partitionLagMetricName + "-max", this.metricGrpName, "The max lag of the partition"), new Max());
                sensor.add(this.metrics.metricName(partitionLagMetricName + "-avg", this.metricGrpName, "The average lag of the partition"), new Avg());
            }
            sensor.record(j);
        }

        private static String partitionLagMetricName(TopicPartition topicPartition) {
            return topicPartition + ".records-lag";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$FetchResponseMetricAggregator.class */
    public static class FetchResponseMetricAggregator {
        private final FetchManagerMetrics sensors;
        private final Set<TopicPartition> unrecordedPartitions;
        private final FetchMetrics fetchMetrics;
        private final Map<String, FetchMetrics> topicFetchMetrics;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$FetchResponseMetricAggregator$FetchMetrics.class */
        public static class FetchMetrics {
            private int fetchBytes;
            private int fetchRecords;

            private FetchMetrics() {
            }

            protected void increment(int i, int i2) {
                this.fetchBytes += i;
                this.fetchRecords += i2;
            }
        }

        private FetchResponseMetricAggregator(FetchManagerMetrics fetchManagerMetrics, Set<TopicPartition> set) {
            this.fetchMetrics = new FetchMetrics();
            this.topicFetchMetrics = new HashMap();
            this.sensors = fetchManagerMetrics;
            this.unrecordedPartitions = set;
        }

        public void record(TopicPartition topicPartition, int i, int i2) {
            this.unrecordedPartitions.remove(topicPartition);
            this.fetchMetrics.increment(i, i2);
            String str = topicPartition.topic();
            FetchMetrics fetchMetrics = this.topicFetchMetrics.get(str);
            if (fetchMetrics == null) {
                fetchMetrics = new FetchMetrics();
                this.topicFetchMetrics.put(str, fetchMetrics);
            }
            fetchMetrics.increment(i, i2);
            if (this.unrecordedPartitions.isEmpty()) {
                this.sensors.bytesFetched.record(fetchMetrics.fetchBytes);
                this.sensors.recordsFetched.record(fetchMetrics.fetchRecords);
                for (Map.Entry<String, FetchMetrics> entry : this.topicFetchMetrics.entrySet()) {
                    FetchMetrics value = entry.getValue();
                    this.sensors.recordTopicFetchMetrics(entry.getKey(), value.fetchBytes, value.fetchRecords);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$OffsetData.class */
    public static class OffsetData {
        final long offset;
        final Long timestamp;

        OffsetData(long j, Long l) {
            this.offset = j;
            this.timestamp = l;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$PartitionRecords.class */
    public static class PartitionRecords<K, V> {
        private long fetchOffset;
        private TopicPartition partition;
        private List<ConsumerRecord<K, V>> records;
        private int position;

        private PartitionRecords(long j, TopicPartition topicPartition, List<ConsumerRecord<K, V>> list) {
            this.position = 0;
            this.fetchOffset = j;
            this.partition = topicPartition;
            this.records = list;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isDrained() {
            return this.records == null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void drain() {
            this.records = null;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<ConsumerRecord<K, V>> drainRecords(int i) {
            if (isDrained() || this.position >= this.records.size()) {
                drain();
                return Collections.emptyList();
            }
            int min = Math.min(this.records.size(), this.position + i);
            List<ConsumerRecord<K, V>> unmodifiableList = Collections.unmodifiableList(this.records.subList(this.position, min));
            this.position = min;
            if (this.position < this.records.size()) {
                this.fetchOffset = this.records.get(this.position).offset();
            }
            return unmodifiableList;
        }
    }

    public Fetcher(ConsumerNetworkClient consumerNetworkClient, int i, int i2, int i3, int i4, int i5, boolean z, Deserializer<K> deserializer, Deserializer<V> deserializer2, Metadata metadata, SubscriptionState subscriptionState, Metrics metrics, String str, Time time, long j) {
        this.time = time;
        this.client = consumerNetworkClient;
        this.metadata = metadata;
        this.subscriptions = subscriptionState;
        this.minBytes = i;
        this.maxBytes = i2;
        this.maxWaitMs = i3;
        this.fetchSize = i4;
        this.maxPollRecords = i5;
        this.checkCrcs = z;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.sensors = new FetchManagerMetrics(metrics, str);
        this.retryBackoffMs = j;
        subscriptionState.addListener(this);
    }

    public boolean hasCompletedFetches() {
        return !this.completedFetches.isEmpty();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean matchesRequestedPartitions(FetchRequest.Builder builder, FetchResponse fetchResponse) {
        return fetchResponse.responseData().keySet().equals(builder.fetchData().keySet());
    }

    public int sendFetches() {
        Map<Node, FetchRequest.Builder> createFetchRequests = createFetchRequests();
        for (Map.Entry<Node, FetchRequest.Builder> entry : createFetchRequests.entrySet()) {
            final FetchRequest.Builder value = entry.getValue();
            final Node key = entry.getKey();
            log.debug("Sending fetch for partitions {} to broker {}", value.fetchData().keySet(), key);
            this.client.send(key, value).addListener(new RequestFutureListener<ClientResponse>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ClientResponse clientResponse) {
                    FetchResponse fetchResponse = (FetchResponse) clientResponse.responseBody();
                    if (!Fetcher.this.matchesRequestedPartitions(value, fetchResponse)) {
                        Fetcher.log.warn("Ignoring fetch response containing partitions {} since it does not match the requested partitions {}", fetchResponse.responseData().keySet(), value.fetchData().keySet());
                        return;
                    }
                    FetchResponseMetricAggregator fetchResponseMetricAggregator = new FetchResponseMetricAggregator(Fetcher.this.sensors, new HashSet(fetchResponse.responseData().keySet()));
                    for (Map.Entry<TopicPartition, FetchResponse.PartitionData> entry2 : fetchResponse.responseData().entrySet()) {
                        TopicPartition key2 = entry2.getKey();
                        Fetcher.this.completedFetches.add(new CompletedFetch(key2, value.fetchData().get(key2).offset, entry2.getValue(), fetchResponseMetricAggregator, value.version()));
                    }
                    Fetcher.this.sensors.fetchLatency.record(clientResponse.requestLatencyMs());
                    Fetcher.this.sensors.fetchThrottleTimeSensor.record(fetchResponse.getThrottleTime());
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    Fetcher.log.debug("Fetch request to {} for partitions {} failed", new Object[]{key, value.fetchData().keySet(), runtimeException});
                }
            });
        }
        return createFetchRequests.size();
    }

    public void resetOffsetsIfNeeded(Set<TopicPartition> set) {
        for (TopicPartition topicPartition : set) {
            if (this.subscriptions.isAssigned(topicPartition) && this.subscriptions.isOffsetResetNeeded(topicPartition)) {
                resetOffset(topicPartition);
            }
        }
    }

    public void updateFetchPositions(Set<TopicPartition> set) {
        for (TopicPartition topicPartition : set) {
            if (this.subscriptions.isAssigned(topicPartition) && !this.subscriptions.hasValidPosition(topicPartition)) {
                if (this.subscriptions.isOffsetResetNeeded(topicPartition)) {
                    resetOffset(topicPartition);
                } else if (this.subscriptions.committed(topicPartition) == null) {
                    this.subscriptions.needOffsetReset(topicPartition);
                    resetOffset(topicPartition);
                } else {
                    long offset = this.subscriptions.committed(topicPartition).offset();
                    log.debug("Resetting offset for partition {} to the committed offset {}", topicPartition, Long.valueOf(offset));
                    this.subscriptions.seek(topicPartition, offset);
                }
            }
        }
    }

    public Map<String, List<PartitionInfo>> getAllTopicMetadata(long j) {
        return getTopicMetadata(MetadataRequest.Builder.allTopics(), j);
    }

    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder builder, long j) {
        if (!builder.isAllTopics() && builder.topics().isEmpty()) {
            return Collections.emptyMap();
        }
        long milliseconds = this.time.milliseconds();
        long j2 = j;
        do {
            RequestFuture<ClientResponse> sendMetadataRequest = sendMetadataRequest(builder);
            this.client.poll(sendMetadataRequest, j2);
            if (sendMetadataRequest.failed() && !sendMetadataRequest.isRetriable()) {
                throw sendMetadataRequest.exception();
            }
            if (sendMetadataRequest.succeeded()) {
                MetadataResponse metadataResponse = (MetadataResponse) sendMetadataRequest.value().responseBody();
                Cluster cluster = metadataResponse.cluster();
                Set<String> unauthorizedTopics = cluster.unauthorizedTopics();
                if (!unauthorizedTopics.isEmpty()) {
                    throw new TopicAuthorizationException(unauthorizedTopics);
                }
                boolean z = false;
                Map<String, Errors> errors = metadataResponse.errors();
                if (!errors.isEmpty()) {
                    log.debug("Topic metadata fetch included errors: {}", errors);
                    for (Map.Entry<String, Errors> entry : errors.entrySet()) {
                        String key = entry.getKey();
                        Errors value = entry.getValue();
                        if (value == Errors.INVALID_TOPIC_EXCEPTION) {
                            throw new InvalidTopicException("Topic '" + key + "' is invalid");
                        }
                        if (value != Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                            if (!(value.exception() instanceof RetriableException)) {
                                throw new KafkaException("Unexpected error fetching metadata for topic " + key, value.exception());
                            }
                            z = true;
                        }
                    }
                }
                if (!z) {
                    HashMap hashMap = new HashMap();
                    for (String str : cluster.topics()) {
                        hashMap.put(str, cluster.availablePartitionsForTopic(str));
                    }
                    return hashMap;
                }
            }
            j2 = j - (this.time.milliseconds() - milliseconds);
            if (j2 > 0) {
                long min = Math.min(j2, this.retryBackoffMs);
                this.time.sleep(min);
                j2 -= min;
            }
        } while (j2 > 0);
        throw new TimeoutException("Timeout expired while fetching topic metadata");
    }

    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builder builder) {
        Node leastLoadedNode = this.client.leastLoadedNode();
        return leastLoadedNode == null ? RequestFuture.noBrokersAvailable() : this.client.send(leastLoadedNode, builder);
    }

    private void resetOffset(TopicPartition topicPartition) {
        long j;
        OffsetResetStrategy resetStrategy = this.subscriptions.resetStrategy(topicPartition);
        log.debug("Resetting offset for partition {} to {} offset.", topicPartition, resetStrategy.name().toLowerCase(Locale.ROOT));
        if (resetStrategy == OffsetResetStrategy.EARLIEST) {
            j = -2;
        } else {
            if (resetStrategy != OffsetResetStrategy.LATEST) {
                throw new NoOffsetForPartitionException(topicPartition);
            }
            j = -1;
        }
        OffsetData offsetData = retrieveOffsetsByTimes(Collections.singletonMap(topicPartition, Long.valueOf(j)), Long.MAX_VALUE, false).get(topicPartition);
        if (offsetData == null) {
            throw new NoOffsetForPartitionException(topicPartition);
        }
        long j2 = offsetData.offset;
        if (this.subscriptions.isAssigned(topicPartition)) {
            this.subscriptions.seek(topicPartition, j2);
        }
    }

    public Map<TopicPartition, OffsetAndTimestamp> getOffsetsByTimes(Map<TopicPartition, Long> map, long j) {
        Map<TopicPartition, OffsetData> retrieveOffsetsByTimes = retrieveOffsetsByTimes(map, j, true);
        HashMap hashMap = new HashMap(retrieveOffsetsByTimes.size());
        for (Map.Entry<TopicPartition, OffsetData> entry : retrieveOffsetsByTimes.entrySet()) {
            OffsetData value = entry.getValue();
            if (value == null) {
                hashMap.put(entry.getKey(), null);
            } else {
                hashMap.put(entry.getKey(), new OffsetAndTimestamp(value.offset, value.timestamp.longValue()));
            }
        }
        return hashMap;
    }

    private Map<TopicPartition, OffsetData> retrieveOffsetsByTimes(Map<TopicPartition, Long> map, long j, boolean z) {
        if (map.isEmpty()) {
            return Collections.emptyMap();
        }
        long milliseconds = this.time.milliseconds();
        long j2 = j;
        do {
            RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequests = sendListOffsetRequests(z, map);
            this.client.poll(sendListOffsetRequests, j2);
            if (!sendListOffsetRequests.isDone()) {
                break;
            }
            if (!sendListOffsetRequests.succeeded()) {
                if (!sendListOffsetRequests.isRetriable()) {
                    throw sendListOffsetRequests.exception();
                }
                long milliseconds2 = j - (this.time.milliseconds() - milliseconds);
                if (milliseconds2 <= 0) {
                    break;
                }
                if (sendListOffsetRequests.exception() instanceof InvalidMetadataException) {
                    this.client.awaitMetadataUpdate(milliseconds2);
                } else {
                    this.time.sleep(Math.min(milliseconds2, this.retryBackoffMs));
                }
                j2 = j - (this.time.milliseconds() - milliseconds);
            } else {
                return sendListOffsetRequests.value();
            }
        } while (j2 > 0);
        throw new TimeoutException("Failed to get offsets by times in " + j + " ms");
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, long j) {
        return beginningOrEndOffset(collection, -2L, j);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, long j) {
        return beginningOrEndOffset(collection, -1L, j);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> collection, long j, long j2) {
        HashMap hashMap = new HashMap();
        Iterator<TopicPartition> it = collection.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), Long.valueOf(j));
        }
        HashMap hashMap2 = new HashMap();
        for (Map.Entry<TopicPartition, OffsetData> entry : retrieveOffsetsByTimes(hashMap, j2, false).entrySet()) {
            hashMap2.put(entry.getKey(), Long.valueOf(entry.getValue().offset));
        }
        return hashMap2;
    }

    public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
        if (this.nextInLineExceptionMetadata != null) {
            ExceptionMetadata exceptionMetadata = this.nextInLineExceptionMetadata;
            this.nextInLineExceptionMetadata = null;
            TopicPartition topicPartition = exceptionMetadata.partition;
            if (this.subscriptions.isFetchable(topicPartition) && this.subscriptions.position(topicPartition).longValue() == exceptionMetadata.fetchedOffset) {
                throw exceptionMetadata.exception;
            }
        }
        HashMap hashMap = new HashMap();
        int i = this.maxPollRecords;
        while (i > 0) {
            if (this.nextInLineRecords == null || this.nextInLineRecords.isDrained()) {
                CompletedFetch poll = this.completedFetches.poll();
                if (poll == null) {
                    break;
                }
                try {
                    this.nextInLineRecords = parseCompletedFetch(poll);
                } catch (KafkaException e) {
                    if (hashMap.isEmpty()) {
                        throw e;
                    }
                    this.nextInLineExceptionMetadata = new ExceptionMetadata(poll.partition, poll.fetchedOffset, e);
                }
            } else {
                TopicPartition topicPartition2 = ((PartitionRecords) this.nextInLineRecords).partition;
                List<ConsumerRecord<K, V>> drainRecords = drainRecords(this.nextInLineRecords, i);
                if (!drainRecords.isEmpty()) {
                    List<ConsumerRecord<K, V>> list = hashMap.get(topicPartition2);
                    if (list == null) {
                        hashMap.put(topicPartition2, drainRecords);
                    } else {
                        ArrayList arrayList = new ArrayList(drainRecords.size() + list.size());
                        arrayList.addAll(list);
                        arrayList.addAll(drainRecords);
                        hashMap.put(topicPartition2, arrayList);
                    }
                    i -= drainRecords.size();
                }
            }
        }
        return hashMap;
    }

    private List<ConsumerRecord<K, V>> drainRecords(PartitionRecords<K, V> partitionRecords, int i) {
        if (this.subscriptions.isAssigned(((PartitionRecords) partitionRecords).partition)) {
            long longValue = this.subscriptions.position(((PartitionRecords) partitionRecords).partition).longValue();
            if (!this.subscriptions.isFetchable(((PartitionRecords) partitionRecords).partition)) {
                log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", ((PartitionRecords) partitionRecords).partition);
            } else {
                if (((PartitionRecords) partitionRecords).fetchOffset == longValue) {
                    List<ConsumerRecord<K, V>> drainRecords = partitionRecords.drainRecords(i);
                    if (!drainRecords.isEmpty()) {
                        long offset = drainRecords.get(drainRecords.size() - 1).offset() + 1;
                        log.trace("Returning fetched records at offset {} for assigned partition {} and update position to {}", new Object[]{Long.valueOf(longValue), ((PartitionRecords) partitionRecords).partition, Long.valueOf(offset)});
                        this.subscriptions.position(((PartitionRecords) partitionRecords).partition, offset);
                    }
                    Long partitionLag = this.subscriptions.partitionLag(((PartitionRecords) partitionRecords).partition);
                    if (partitionLag != null) {
                        this.sensors.recordPartitionLag(((PartitionRecords) partitionRecords).partition, partitionLag.longValue());
                    }
                    return drainRecords;
                }
                log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", new Object[]{((PartitionRecords) partitionRecords).partition, Long.valueOf(((PartitionRecords) partitionRecords).fetchOffset), Long.valueOf(longValue)});
            }
        } else {
            log.debug("Not returning fetched records for partition {} since it is no longer assigned", ((PartitionRecords) partitionRecords).partition);
        }
        partitionRecords.drain();
        return Collections.emptyList();
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v43, types: [java.util.Map] */
    /* JADX WARN: Type inference failed for: r0v45, types: [java.util.Map] */
    /* JADX WARN: Type inference failed for: r0v47, types: [java.util.HashMap] */
    private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequests(boolean z, Map<TopicPartition, Long> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            PartitionInfo partition = this.metadata.fetch().partition(key);
            if (partition == null) {
                this.metadata.add(key.topic());
                log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", key);
                return RequestFuture.staleMetadata();
            }
            if (partition.leader() == null) {
                log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", key);
                return RequestFuture.leaderNotAvailable();
            }
            Node leader = partition.leader();
            V v = (Map) hashMap.get(leader);
            if (v == null) {
                v = new HashMap();
                hashMap.put(leader, v);
            }
            v.put(entry.getKey(), entry.getValue());
        }
        final RequestFuture<Map<TopicPartition, OffsetData>> requestFuture = new RequestFuture<>();
        final HashMap hashMap2 = new HashMap();
        final AtomicInteger atomicInteger = new AtomicInteger(hashMap.size());
        for (Map.Entry<K, V> entry2 : hashMap.entrySet()) {
            sendListOffsetRequest((Node) entry2.getKey(), (Map) entry2.getValue(), z).addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.2
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(Map<TopicPartition, OffsetData> map2) {
                    synchronized (requestFuture) {
                        hashMap2.putAll(map2);
                        if (atomicInteger.decrementAndGet() == 0 && !requestFuture.isDone()) {
                            requestFuture.complete(hashMap2);
                        }
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    synchronized (requestFuture) {
                        if (!requestFuture.isDone()) {
                            requestFuture.raise(runtimeException);
                        }
                    }
                }
            });
        }
        return requestFuture;
    }

    private RequestFuture<Map<TopicPartition, OffsetData>> sendListOffsetRequest(final Node node, final Map<TopicPartition, Long> map, boolean z) {
        ListOffsetRequest.Builder targetTimes = new ListOffsetRequest.Builder().setTargetTimes(map);
        targetTimes.setMinVersion(z ? (short) 1 : (short) 0);
        log.trace("Sending ListOffsetRequest {} to broker {}", targetTimes, node);
        return this.client.send(node, targetTimes).compose(new RequestFutureAdapter<ClientResponse, Map<TopicPartition, OffsetData>>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.3
            @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
            public void onSuccess(ClientResponse clientResponse, RequestFuture<Map<TopicPartition, OffsetData>> requestFuture) {
                ListOffsetResponse listOffsetResponse = (ListOffsetResponse) clientResponse.responseBody();
                Fetcher.log.trace("Received ListOffsetResponse {} from broker {}", listOffsetResponse, node);
                Fetcher.this.handleListOffsetResponse(map, listOffsetResponse, requestFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleListOffsetResponse(Map<TopicPartition, Long> map, ListOffsetResponse listOffsetResponse, RequestFuture<Map<TopicPartition, OffsetData>> requestFuture) {
        HashMap hashMap = new HashMap();
        Iterator<Map.Entry<TopicPartition, Long>> it = map.entrySet().iterator();
        while (it.hasNext()) {
            TopicPartition key = it.next().getKey();
            ListOffsetResponse.PartitionData partitionData = listOffsetResponse.responseData().get(key);
            Errors forCode = Errors.forCode(partitionData.errorCode);
            if (forCode == Errors.NONE) {
                if (partitionData.offsets == null) {
                    log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", new Object[]{key, partitionData.offset, partitionData.timestamp});
                    if (partitionData.offset.longValue() != -1) {
                        hashMap.put(key, new OffsetData(partitionData.offset.longValue(), partitionData.timestamp));
                    }
                } else {
                    if (partitionData.offsets.size() > 1) {
                        requestFuture.raise(new IllegalStateException("Unexpected partitionData response of length " + partitionData.offsets.size()));
                        return;
                    }
                    long longValue = partitionData.offsets.isEmpty() ? -1L : partitionData.offsets.get(0).longValue();
                    log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", key, Long.valueOf(longValue));
                    if (longValue != -1) {
                        hashMap.put(key, new OffsetData(longValue, null));
                    }
                }
            } else if (forCode == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
                log.debug("Cannot search by timestamp for partition {} because the message format version is before 0.10.0", key);
                hashMap.put(key, null);
            } else if (forCode == Errors.NOT_LEADER_FOR_PARTITION) {
                log.debug("Attempt to fetch offsets for partition {} failed due to obsolete leadership information, retrying.", key);
                requestFuture.raise(forCode);
            } else if (forCode == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                log.warn("Received unknown topic or partition error in ListOffset request for partition {}. The topic/partition may not exist or the user may not have Describe access to it", key);
                requestFuture.raise(forCode);
            } else {
                log.warn("Attempt to fetch offsets for partition {} failed due to: {}", key, forCode.message());
                requestFuture.raise(new StaleMetadataException());
            }
        }
        if (requestFuture.isDone()) {
            return;
        }
        requestFuture.complete(hashMap);
    }

    private List<TopicPartition> fetchablePartitions() {
        HashSet hashSet = new HashSet();
        List<TopicPartition> fetchablePartitions = this.subscriptions.fetchablePartitions();
        if (this.nextInLineRecords != null && !this.nextInLineRecords.isDrained()) {
            hashSet.add(((PartitionRecords) this.nextInLineRecords).partition);
        }
        Iterator<CompletedFetch> it = this.completedFetches.iterator();
        while (it.hasNext()) {
            hashSet.add(it.next().partition);
        }
        fetchablePartitions.removeAll(hashSet);
        return fetchablePartitions;
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v40, types: [java.util.LinkedHashMap] */
    /* JADX WARN: Type inference failed for: r0v46, types: [java.util.LinkedHashMap] */
    /* JADX WARN: Type inference failed for: r0v49, types: [java.util.LinkedHashMap] */
    private Map<Node, FetchRequest.Builder> createFetchRequests() {
        Cluster fetch = this.metadata.fetch();
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        for (TopicPartition topicPartition : fetchablePartitions()) {
            Node leaderFor = fetch.leaderFor(topicPartition);
            if (leaderFor == null) {
                this.metadata.requestUpdate();
            } else if (this.client.pendingRequestCount(leaderFor) == 0) {
                V v = (LinkedHashMap) linkedHashMap.get(leaderFor);
                if (v == null) {
                    v = new LinkedHashMap();
                    linkedHashMap.put(leaderFor, v);
                }
                long longValue = this.subscriptions.position(topicPartition).longValue();
                v.put(topicPartition, new FetchRequest.PartitionData(longValue, this.fetchSize));
                log.trace("Added fetch request for partition {} at offset {} to node {}", new Object[]{topicPartition, Long.valueOf(longValue), leaderFor});
            } else {
                log.trace("Skipping fetch for partition {} because there is an in-flight request to {}", topicPartition, leaderFor);
            }
        }
        HashMap hashMap = new HashMap();
        for (Map.Entry<K, V> entry : linkedHashMap.entrySet()) {
            hashMap.put((Node) entry.getKey(), new FetchRequest.Builder(this.maxWaitMs, this.minBytes, (LinkedHashMap) entry.getValue()).setMaxBytes(this.maxBytes));
        }
        return hashMap;
    }

    private PartitionRecords<K, V> parseCompletedFetch(CompletedFetch completedFetch) {
        TopicPartition topicPartition = completedFetch.partition;
        FetchResponse.PartitionData partitionData = completedFetch.partitionData;
        long j = completedFetch.fetchedOffset;
        int i = 0;
        int i2 = 0;
        PartitionRecords<K, V> partitionRecords = null;
        Errors forCode = Errors.forCode(partitionData.errorCode);
        try {
            if (!this.subscriptions.isFetchable(topicPartition)) {
                log.debug("Ignoring fetched records for partition {} since it is no longer fetchable", topicPartition);
            } else if (forCode == Errors.NONE) {
                Long position = this.subscriptions.position(topicPartition);
                if (position == null || position.longValue() != j) {
                    log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {}", new Object[]{topicPartition, Long.valueOf(j), position});
                    completedFetch.metricAggregator.record(topicPartition, 0, 0);
                    return null;
                }
                ArrayList arrayList = new ArrayList();
                boolean z = false;
                for (LogEntry logEntry : partitionData.records.deepEntries()) {
                    if (logEntry.offset() >= position.longValue()) {
                        arrayList.add(parseRecord(topicPartition, logEntry));
                        i += logEntry.sizeInBytes();
                    } else {
                        z = true;
                    }
                }
                i2 = arrayList.size();
                log.trace("Adding fetched record for partition {} with offset {} to buffered record list", topicPartition, position);
                partitionRecords = new PartitionRecords<>(j, topicPartition, arrayList);
                if (arrayList.isEmpty() && !z && partitionData.records.sizeInBytes() > 0) {
                    if (completedFetch.responseVersion >= 3) {
                        throw new KafkaException("Failed to make progress reading messages at " + topicPartition + "=" + j + ". Received a non-empty fetch response from the server, but no complete records were found.");
                    }
                    Map singletonMap = Collections.singletonMap(topicPartition, Long.valueOf(j));
                    throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + singletonMap + " whose size is larger than the fetch size " + this.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")", (Map<TopicPartition, Long>) singletonMap);
                }
                if (partitionData.highWatermark >= 0) {
                    log.trace("Received {} records in fetch response for partition {} with offset {}", new Object[]{Integer.valueOf(arrayList.size()), topicPartition, position});
                    this.subscriptions.updateHighWatermark(topicPartition, partitionData.highWatermark);
                }
            } else if (forCode == Errors.NOT_LEADER_FOR_PARTITION) {
                log.debug("Error in fetch for partition {}: {}", topicPartition, forCode.exceptionName());
                this.metadata.requestUpdate();
            } else if (forCode == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                log.warn("Received unknown topic or partition error in fetch for partition {}. The topic/partition may not exist or the user may not have Describe access to it", topicPartition);
                this.metadata.requestUpdate();
            } else if (forCode != Errors.OFFSET_OUT_OF_RANGE) {
                if (forCode == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    log.warn("Not authorized to read from topic {}.", topicPartition.topic());
                    throw new TopicAuthorizationException((Set<String>) Collections.singleton(topicPartition.topic()));
                }
                if (forCode != Errors.UNKNOWN) {
                    throw new IllegalStateException("Unexpected error code " + ((int) forCode.code()) + " while fetching data");
                }
                log.warn("Unknown error fetching data for topic-partition {}", topicPartition);
            } else if (j != this.subscriptions.position(topicPartition).longValue()) {
                log.debug("Discarding stale fetch response for partition {} since the fetched offset {}does not match the current offset {}", new Object[]{topicPartition, Long.valueOf(j), this.subscriptions.position(topicPartition)});
            } else {
                if (!this.subscriptions.hasDefaultOffsetResetPolicy()) {
                    throw new OffsetOutOfRangeException(Collections.singletonMap(topicPartition, Long.valueOf(j)));
                }
                log.info("Fetch offset {} is out of range for partition {}, resetting offset", Long.valueOf(j), topicPartition);
                this.subscriptions.needOffsetReset(topicPartition);
            }
            completedFetch.metricAggregator.record(topicPartition, i, i2);
            if (i > 0 || forCode != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(topicPartition);
            }
            return partitionRecords;
        } catch (Throwable th) {
            completedFetch.metricAggregator.record(topicPartition, 0, 0);
            throw th;
        }
    }

    private ConsumerRecord<K, V> parseRecord(TopicPartition topicPartition, LogEntry logEntry) {
        Record record = logEntry.record();
        if (this.checkCrcs) {
            try {
                record.ensureValid();
            } catch (InvalidRecordException e) {
                throw new KafkaException("Record for partition " + topicPartition + " at offset " + logEntry.offset() + " is invalid, cause: " + e.getMessage());
            }
        }
        try {
            long offset = logEntry.offset();
            long timestamp = record.timestamp();
            TimestampType timestampType = record.timestampType();
            ByteBuffer key = record.key();
            byte[] array = key == null ? null : Utils.toArray(key);
            K deserialize = key == null ? null : this.keyDeserializer.deserialize(topicPartition.topic(), array);
            ByteBuffer value = record.value();
            byte[] array2 = value == null ? null : Utils.toArray(value);
            return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), offset, timestamp, timestampType, record.checksum(), array == null ? -1 : array.length, array2 == null ? -1 : array2.length, deserialize, value == null ? null : this.valueDeserializer.deserialize(topicPartition.topic(), array2));
        } catch (RuntimeException e2) {
            throw new SerializationException("Error deserializing key/value for partition " + topicPartition + " at offset " + logEntry.offset(), e2);
        }
    }

    @Override // org.apache.kafka.clients.consumer.internals.SubscriptionState.Listener
    public void onAssignment(Set<TopicPartition> set) {
        this.sensors.updatePartitionLagSensors(set);
    }
}
