package org.apache.kafka.clients.consumer.internals;

import java.io.Closeable;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordDeserializationException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Meter;
import org.apache.kafka.common.metrics.stats.Min;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.metrics.stats.WindowedCount;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;
import org.slf4j.helpers.MessageFormatter;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher.class */
public class Fetcher<K, V> implements Closeable {
    private final Logger log;
    private final LogContext logContext;
    private final ConsumerNetworkClient client;
    private final Time time;
    private final int minBytes;
    private final int maxBytes;
    private final int maxWaitMs;
    private final int fetchSize;
    private final long retryBackoffMs;
    private final long requestTimeoutMs;
    private final int maxPollRecords;
    private final boolean checkCrcs;
    private final String clientRackId;
    private final ConsumerMetadata metadata;
    private final FetchManagerMetrics sensors;
    private final SubscriptionState subscriptions;
    private final Deserializer<K> keyDeserializer;
    private final Deserializer<V> valueDeserializer;
    private final IsolationLevel isolationLevel;
    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
    private final ApiVersions apiVersions;
    private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);
    private Fetcher<K, V>.CompletedFetch nextInLineFetch = null;
    private final ConcurrentLinkedQueue<Fetcher<K, V>.CompletedFetch> completedFetches = new ConcurrentLinkedQueue<>();
    private final Map<Integer, FetchSessionHandler> sessionHandlers = new HashMap();
    private final Set<Integer> nodesWithPendingFetchRequests = new HashSet();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.kafka.clients.consumer.internals.Fetcher$7, reason: invalid class name */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$7.class */
    public static /* synthetic */ class AnonymousClass7 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$kafka$common$protocol$Errors = new int[Errors.values().length];

        static {
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NONE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.NOT_LEADER_OR_FOLLOWER.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.REPLICA_NOT_AVAILABLE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.KAFKA_STORAGE_ERROR.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.OFFSET_NOT_AVAILABLE.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.LEADER_NOT_AVAILABLE.ordinal()] = 7;
            } catch (NoSuchFieldError e7) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.FENCED_LEADER_EPOCH.ordinal()] = 8;
            } catch (NoSuchFieldError e8) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_LEADER_EPOCH.ordinal()] = 9;
            } catch (NoSuchFieldError e9) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.UNKNOWN_TOPIC_OR_PARTITION.ordinal()] = 10;
            } catch (NoSuchFieldError e10) {
            }
            try {
                $SwitchMap$org$apache$kafka$common$protocol$Errors[Errors.TOPIC_AUTHORIZATION_FAILED.ordinal()] = 11;
            } catch (NoSuchFieldError e11) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$CompletedFetch.class */
    public class CompletedFetch {
        private final TopicPartition partition;
        private final Iterator<? extends RecordBatch> batches;
        private final Set<Long> abortedProducerIds;
        private final PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions;
        private final FetchResponseData.PartitionData partitionData;
        private final FetchResponseMetricAggregator metricAggregator;
        private final short responseVersion;
        private int recordsRead;
        private int bytesRead;
        private RecordBatch currentBatch;
        private Record lastRecord;
        private CloseableIterator<Record> records;
        private long nextFetchOffset;
        private Optional<Integer> lastEpoch;
        private boolean isConsumed;
        private Exception cachedRecordException;
        private boolean corruptLastRecord;
        private boolean initialized;

        private CompletedFetch(TopicPartition topicPartition, FetchResponseData.PartitionData partitionData, FetchResponseMetricAggregator fetchResponseMetricAggregator, Iterator<? extends RecordBatch> it, Long l, short s) {
            this.isConsumed = false;
            this.cachedRecordException = null;
            this.corruptLastRecord = false;
            this.initialized = false;
            this.partition = topicPartition;
            this.partitionData = partitionData;
            this.metricAggregator = fetchResponseMetricAggregator;
            this.batches = it;
            this.nextFetchOffset = l.longValue();
            this.responseVersion = s;
            this.lastEpoch = Optional.empty();
            this.abortedProducerIds = new HashSet();
            this.abortedTransactions = abortedTransactions(partitionData);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void drain() {
            if (this.isConsumed) {
                return;
            }
            maybeCloseRecordStream();
            this.cachedRecordException = null;
            this.isConsumed = true;
            this.metricAggregator.record(this.partition, this.bytesRead, this.recordsRead);
            if (this.bytesRead > 0) {
                Fetcher.this.subscriptions.movePartitionToEnd(this.partition);
            }
        }

        private void maybeEnsureValid(RecordBatch recordBatch) {
            if (!Fetcher.this.checkCrcs || this.currentBatch.magic() < 2) {
                return;
            }
            try {
                recordBatch.ensureValid();
            } catch (CorruptRecordException e) {
                throw new KafkaException("Record batch for partition " + this.partition + " at offset " + recordBatch.baseOffset() + " is invalid, cause: " + e.getMessage());
            }
        }

        private void maybeEnsureValid(Record record) {
            if (Fetcher.this.checkCrcs) {
                try {
                    record.ensureValid();
                } catch (CorruptRecordException e) {
                    throw new KafkaException("Record for partition " + this.partition + " at offset " + record.offset() + " is invalid, cause: " + e.getMessage());
                }
            }
        }

        private void maybeCloseRecordStream() {
            if (this.records != null) {
                this.records.close();
                this.records = null;
            }
        }

        private Record nextFetchedRecord() {
            while (true) {
                if (this.records == null || !this.records.hasNext()) {
                    maybeCloseRecordStream();
                    if (!this.batches.hasNext()) {
                        if (this.currentBatch != null) {
                            this.nextFetchOffset = this.currentBatch.nextOffset();
                        }
                        drain();
                        return null;
                    }
                    this.currentBatch = this.batches.next();
                    this.lastEpoch = this.currentBatch.partitionLeaderEpoch() == -1 ? Optional.empty() : Optional.of(Integer.valueOf(this.currentBatch.partitionLeaderEpoch()));
                    maybeEnsureValid(this.currentBatch);
                    if (Fetcher.this.isolationLevel == IsolationLevel.READ_COMMITTED && this.currentBatch.hasProducerId()) {
                        consumeAbortedTransactionsUpTo(this.currentBatch.lastOffset());
                        long producerId = this.currentBatch.producerId();
                        if (containsAbortMarker(this.currentBatch)) {
                            this.abortedProducerIds.remove(Long.valueOf(producerId));
                        } else if (isBatchAborted(this.currentBatch)) {
                            Fetcher.this.log.debug("Skipping aborted record batch from partition {} with producerId {} and offsets {} to {}", new Object[]{this.partition, Long.valueOf(producerId), Long.valueOf(this.currentBatch.baseOffset()), Long.valueOf(this.currentBatch.lastOffset())});
                            this.nextFetchOffset = this.currentBatch.nextOffset();
                        }
                    }
                    this.records = this.currentBatch.streamingIterator(Fetcher.this.decompressionBufferSupplier);
                } else {
                    Record next = this.records.next();
                    if (next.offset() >= this.nextFetchOffset) {
                        maybeEnsureValid(next);
                        if (!this.currentBatch.isControlBatch()) {
                            return next;
                        }
                        this.nextFetchOffset = next.offset() + 1;
                    } else {
                        continue;
                    }
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<ConsumerRecord<K, V>> fetchRecords(int i) {
            if (this.corruptLastRecord) {
                throw new KafkaException("Received exception when fetching the next record from " + this.partition + ". If needed, please seek past the record to continue consumption.", this.cachedRecordException);
            }
            if (this.isConsumed) {
                return Collections.emptyList();
            }
            ArrayList arrayList = new ArrayList();
            for (int i2 = 0; i2 < i; i2++) {
                try {
                    if (this.cachedRecordException == null) {
                        this.corruptLastRecord = true;
                        this.lastRecord = nextFetchedRecord();
                        this.corruptLastRecord = false;
                    }
                    if (this.lastRecord == null) {
                        break;
                    }
                    arrayList.add(Fetcher.this.parseRecord(this.partition, this.currentBatch, this.lastRecord));
                    this.recordsRead++;
                    this.bytesRead += this.lastRecord.sizeInBytes();
                    this.nextFetchOffset = this.lastRecord.offset() + 1;
                    this.cachedRecordException = null;
                } catch (SerializationException e) {
                    this.cachedRecordException = e;
                    if (arrayList.isEmpty()) {
                        throw e;
                    }
                } catch (KafkaException e2) {
                    this.cachedRecordException = e2;
                    if (arrayList.isEmpty()) {
                        throw new KafkaException("Received exception when fetching the next record from " + this.partition + ". If needed, please seek past the record to continue consumption.", e2);
                    }
                }
            }
            return arrayList;
        }

        private void consumeAbortedTransactionsUpTo(long j) {
            if (this.abortedTransactions == null) {
                return;
            }
            while (!this.abortedTransactions.isEmpty() && this.abortedTransactions.peek().firstOffset() <= j) {
                this.abortedProducerIds.add(Long.valueOf(this.abortedTransactions.poll().producerId()));
            }
        }

        private boolean isBatchAborted(RecordBatch recordBatch) {
            return recordBatch.isTransactional() && this.abortedProducerIds.contains(Long.valueOf(recordBatch.producerId()));
        }

        private PriorityQueue<FetchResponseData.AbortedTransaction> abortedTransactions(FetchResponseData.PartitionData partitionData) {
            if (partitionData.abortedTransactions() == null || partitionData.abortedTransactions().isEmpty()) {
                return null;
            }
            PriorityQueue<FetchResponseData.AbortedTransaction> priorityQueue = new PriorityQueue<>(partitionData.abortedTransactions().size(), Comparator.comparingLong((v0) -> {
                return v0.firstOffset();
            }));
            priorityQueue.addAll(partitionData.abortedTransactions());
            return priorityQueue;
        }

        private boolean containsAbortMarker(RecordBatch recordBatch) {
            if (!recordBatch.isControlBatch()) {
                return false;
            }
            Iterator<Record> it = recordBatch.iterator();
            if (it.hasNext()) {
                return ControlRecordType.ABORT == ControlRecordType.parse(it.next().key());
            }
            return false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean notInitialized() {
            return !this.initialized;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$FetchManagerMetrics.class */
    public static class FetchManagerMetrics {
        private final Metrics metrics;
        private FetcherMetricsRegistry metricsRegistry;
        private final Sensor bytesFetched;
        private final Sensor recordsFetched;
        private final Sensor fetchLatency;
        private final Sensor recordsFetchLag;
        private final Sensor recordsFetchLead;
        private int assignmentId;
        private Set<TopicPartition> assignedPartitions;

        private FetchManagerMetrics(Metrics metrics, FetcherMetricsRegistry fetcherMetricsRegistry) {
            this.assignmentId = 0;
            this.assignedPartitions = Collections.emptySet();
            this.metrics = metrics;
            this.metricsRegistry = fetcherMetricsRegistry;
            this.bytesFetched = metrics.sensor("bytes-fetched");
            this.bytesFetched.add(metrics.metricInstance(fetcherMetricsRegistry.fetchSizeAvg, new String[0]), new Avg());
            this.bytesFetched.add(metrics.metricInstance(fetcherMetricsRegistry.fetchSizeMax, new String[0]), new Max());
            this.bytesFetched.add(new Meter(metrics.metricInstance(fetcherMetricsRegistry.bytesConsumedRate, new String[0]), metrics.metricInstance(fetcherMetricsRegistry.bytesConsumedTotal, new String[0])));
            this.recordsFetched = metrics.sensor("records-fetched");
            this.recordsFetched.add(metrics.metricInstance(fetcherMetricsRegistry.recordsPerRequestAvg, new String[0]), new Avg());
            this.recordsFetched.add(new Meter(metrics.metricInstance(fetcherMetricsRegistry.recordsConsumedRate, new String[0]), metrics.metricInstance(fetcherMetricsRegistry.recordsConsumedTotal, new String[0])));
            this.fetchLatency = metrics.sensor("fetch-latency");
            this.fetchLatency.add(metrics.metricInstance(fetcherMetricsRegistry.fetchLatencyAvg, new String[0]), new Avg());
            this.fetchLatency.add(metrics.metricInstance(fetcherMetricsRegistry.fetchLatencyMax, new String[0]), new Max());
            this.fetchLatency.add(new Meter(new WindowedCount(), metrics.metricInstance(fetcherMetricsRegistry.fetchRequestRate, new String[0]), metrics.metricInstance(fetcherMetricsRegistry.fetchRequestTotal, new String[0])));
            this.recordsFetchLag = metrics.sensor("records-lag");
            this.recordsFetchLag.add(metrics.metricInstance(fetcherMetricsRegistry.recordsLagMax, new String[0]), new Max());
            this.recordsFetchLead = metrics.sensor("records-lead");
            this.recordsFetchLead.add(metrics.metricInstance(fetcherMetricsRegistry.recordsLeadMin, new String[0]), new Min());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordTopicFetchMetrics(String str, int i, int i2) {
            String str2 = "topic." + str + ".bytes-fetched";
            Sensor sensor = this.metrics.getSensor(str2);
            if (sensor == null) {
                Map<String, String> singletonMap = Collections.singletonMap("topic", str.replace('.', '_'));
                sensor = this.metrics.sensor(str2);
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.topicFetchSizeAvg, singletonMap), new Avg());
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.topicFetchSizeMax, singletonMap), new Max());
                sensor.add(new Meter(this.metrics.metricInstance(this.metricsRegistry.topicBytesConsumedRate, singletonMap), this.metrics.metricInstance(this.metricsRegistry.topicBytesConsumedTotal, singletonMap)));
            }
            sensor.record(i);
            String str3 = "topic." + str + ".records-fetched";
            Sensor sensor2 = this.metrics.getSensor(str3);
            if (sensor2 == null) {
                HashMap hashMap = new HashMap(1);
                hashMap.put("topic", str.replace('.', '_'));
                sensor2 = this.metrics.sensor(str3);
                sensor2.add(this.metrics.metricInstance(this.metricsRegistry.topicRecordsPerRequestAvg, hashMap), new Avg());
                sensor2.add(new Meter(this.metrics.metricInstance(this.metricsRegistry.topicRecordsConsumedRate, hashMap), this.metrics.metricInstance(this.metricsRegistry.topicRecordsConsumedTotal, hashMap)));
            }
            sensor2.record(i2);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void maybeUpdateAssignment(SubscriptionState subscriptionState) {
            int assignmentId = subscriptionState.assignmentId();
            if (this.assignmentId != assignmentId) {
                Set<TopicPartition> assignedPartitions = subscriptionState.assignedPartitions();
                for (TopicPartition topicPartition : this.assignedPartitions) {
                    if (!assignedPartitions.contains(topicPartition)) {
                        this.metrics.removeSensor(partitionLagMetricName(topicPartition));
                        this.metrics.removeSensor(partitionLeadMetricName(topicPartition));
                        this.metrics.removeMetric(partitionPreferredReadReplicaMetricName(topicPartition));
                    }
                }
                for (TopicPartition topicPartition2 : assignedPartitions) {
                    if (!this.assignedPartitions.contains(topicPartition2)) {
                        this.metrics.addMetricIfAbsent(partitionPreferredReadReplicaMetricName(topicPartition2), null, (metricConfig, j) -> {
                            return subscriptionState.preferredReadReplica(topicPartition2, 0L).orElse(-1);
                        });
                    }
                }
                this.assignedPartitions = assignedPartitions;
                this.assignmentId = assignmentId;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordPartitionLead(TopicPartition topicPartition, long j) {
            this.recordsFetchLead.record(j);
            String partitionLeadMetricName = partitionLeadMetricName(topicPartition);
            Sensor sensor = this.metrics.getSensor(partitionLeadMetricName);
            if (sensor == null) {
                Map<String, String> map = topicPartitionTags(topicPartition);
                sensor = this.metrics.sensor(partitionLeadMetricName);
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.partitionRecordsLead, map), new Value());
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.partitionRecordsLeadMin, map), new Min());
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.partitionRecordsLeadAvg, map), new Avg());
            }
            sensor.record(j);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void recordPartitionLag(TopicPartition topicPartition, long j) {
            this.recordsFetchLag.record(j);
            String partitionLagMetricName = partitionLagMetricName(topicPartition);
            Sensor sensor = this.metrics.getSensor(partitionLagMetricName);
            if (sensor == null) {
                Map<String, String> map = topicPartitionTags(topicPartition);
                sensor = this.metrics.sensor(partitionLagMetricName);
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.partitionRecordsLag, map), new Value());
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.partitionRecordsLagMax, map), new Max());
                sensor.add(this.metrics.metricInstance(this.metricsRegistry.partitionRecordsLagAvg, map), new Avg());
            }
            sensor.record(j);
        }

        private static String partitionLagMetricName(TopicPartition topicPartition) {
            return topicPartition + ".records-lag";
        }

        private static String partitionLeadMetricName(TopicPartition topicPartition) {
            return topicPartition + ".records-lead";
        }

        private MetricName partitionPreferredReadReplicaMetricName(TopicPartition topicPartition) {
            return this.metrics.metricInstance(this.metricsRegistry.partitionPreferredReadReplica, topicPartitionTags(topicPartition));
        }

        private Map<String, String> topicPartitionTags(TopicPartition topicPartition) {
            HashMap hashMap = new HashMap(2);
            hashMap.put("topic", topicPartition.topic().replace('.', '_'));
            hashMap.put("partition", String.valueOf(topicPartition.partition()));
            return hashMap;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$FetchResponseMetricAggregator.class */
    public static class FetchResponseMetricAggregator {
        private final FetchManagerMetrics sensors;
        private final Set<TopicPartition> unrecordedPartitions;
        private final FetchMetrics fetchMetrics;
        private final Map<String, FetchMetrics> topicFetchMetrics;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$FetchResponseMetricAggregator$FetchMetrics.class */
        public static class FetchMetrics {
            private int fetchBytes;
            private int fetchRecords;

            private FetchMetrics() {
            }

            protected void increment(int i, int i2) {
                this.fetchBytes += i;
                this.fetchRecords += i2;
            }
        }

        private FetchResponseMetricAggregator(FetchManagerMetrics fetchManagerMetrics, Set<TopicPartition> set) {
            this.fetchMetrics = new FetchMetrics();
            this.topicFetchMetrics = new HashMap();
            this.sensors = fetchManagerMetrics;
            this.unrecordedPartitions = set;
        }

        public void record(TopicPartition topicPartition, int i, int i2) {
            this.unrecordedPartitions.remove(topicPartition);
            this.fetchMetrics.increment(i, i2);
            String str = topicPartition.topic();
            FetchMetrics fetchMetrics = this.topicFetchMetrics.get(str);
            if (fetchMetrics == null) {
                fetchMetrics = new FetchMetrics();
                this.topicFetchMetrics.put(str, fetchMetrics);
            }
            fetchMetrics.increment(i, i2);
            if (this.unrecordedPartitions.isEmpty()) {
                this.sensors.bytesFetched.record(this.fetchMetrics.fetchBytes);
                this.sensors.recordsFetched.record(this.fetchMetrics.fetchRecords);
                for (Map.Entry<String, FetchMetrics> entry : this.topicFetchMetrics.entrySet()) {
                    FetchMetrics value = entry.getValue();
                    this.sensors.recordTopicFetchMetrics(entry.getKey(), value.fetchBytes, value.fetchRecords);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$ListOffsetData.class */
    public static class ListOffsetData {
        final long offset;
        final Long timestamp;
        final Optional<Integer> leaderEpoch;

        ListOffsetData(long j, Long l, Optional<Integer> optional) {
            this.offset = j;
            this.timestamp = l;
            this.leaderEpoch = optional;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/Fetcher$ListOffsetResult.class */
    public static class ListOffsetResult {
        private final Map<TopicPartition, ListOffsetData> fetchedOffsets;
        private final Set<TopicPartition> partitionsToRetry;

        ListOffsetResult(Map<TopicPartition, ListOffsetData> map, Set<TopicPartition> set) {
            this.fetchedOffsets = map;
            this.partitionsToRetry = set;
        }

        ListOffsetResult() {
            this.fetchedOffsets = new HashMap();
            this.partitionsToRetry = new HashSet();
        }
    }

    public Fetcher(LogContext logContext, ConsumerNetworkClient consumerNetworkClient, int i, int i2, int i3, int i4, int i5, boolean z, String str, Deserializer<K> deserializer, Deserializer<V> deserializer2, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, Metrics metrics, FetcherMetricsRegistry fetcherMetricsRegistry, Time time, long j, long j2, IsolationLevel isolationLevel, ApiVersions apiVersions) {
        this.log = logContext.logger(Fetcher.class);
        this.logContext = logContext;
        this.time = time;
        this.client = consumerNetworkClient;
        this.metadata = consumerMetadata;
        this.subscriptions = subscriptionState;
        this.minBytes = i;
        this.maxBytes = i2;
        this.maxWaitMs = i3;
        this.fetchSize = i4;
        this.maxPollRecords = i5;
        this.checkCrcs = z;
        this.clientRackId = str;
        this.keyDeserializer = deserializer;
        this.valueDeserializer = deserializer2;
        this.sensors = new FetchManagerMetrics(metrics, fetcherMetricsRegistry);
        this.retryBackoffMs = j;
        this.requestTimeoutMs = j2;
        this.isolationLevel = isolationLevel;
        this.apiVersions = apiVersions;
        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(consumerNetworkClient, logContext);
    }

    protected boolean hasCompletedFetches() {
        return !this.completedFetches.isEmpty();
    }

    public boolean hasAvailableFetches() {
        return this.completedFetches.stream().anyMatch(completedFetch -> {
            return this.subscriptions.isFetchable(completedFetch.partition);
        });
    }

    public synchronized int sendFetches() {
        this.sensors.maybeUpdateAssignment(this.subscriptions);
        Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests = prepareFetchRequests();
        for (Map.Entry<Node, FetchSessionHandler.FetchRequestData> entry : prepareFetchRequests.entrySet()) {
            final Node key = entry.getKey();
            final FetchSessionHandler.FetchRequestData value = entry.getValue();
            FetchRequest.Builder rackId = FetchRequest.Builder.forConsumer(!value.canUseTopicIds() ? (short) 12 : ApiKeys.FETCH.latestVersion(), this.maxWaitMs, this.minBytes, value.toSend()).isolationLevel(this.isolationLevel).setMaxBytes(this.maxBytes).metadata(value.metadata()).removed(value.toForget()).replaced(value.toReplace()).rackId(this.clientRackId);
            if (this.log.isDebugEnabled()) {
                this.log.debug("Sending {} {} to broker {}", new Object[]{this.isolationLevel, value.toString(), key});
            }
            RequestFuture<ClientResponse> send = this.client.send(key, rackId);
            this.nodesWithPendingFetchRequests.add(Integer.valueOf(entry.getKey().id()));
            send.addListener(new RequestFutureListener<ClientResponse>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ClientResponse clientResponse) {
                    synchronized (Fetcher.this) {
                        try {
                            FetchResponse fetchResponse = (FetchResponse) clientResponse.responseBody();
                            FetchSessionHandler sessionHandler = Fetcher.this.sessionHandler(key.id());
                            if (sessionHandler == null) {
                                Fetcher.this.log.error("Unable to find FetchSessionHandler for node {}. Ignoring fetch response.", Integer.valueOf(key.id()));
                                Fetcher.this.nodesWithPendingFetchRequests.remove(Integer.valueOf(key.id()));
                                return;
                            }
                            if (!sessionHandler.handleResponse(fetchResponse, clientResponse.requestHeader().apiVersion())) {
                                if (fetchResponse.error() == Errors.FETCH_SESSION_TOPIC_ID_ERROR) {
                                    Fetcher.this.metadata.requestUpdate();
                                }
                                return;
                            }
                            LinkedHashMap<TopicPartition, FetchResponseData.PartitionData> responseData = fetchResponse.responseData(sessionHandler.sessionTopicNames(), clientResponse.requestHeader().apiVersion());
                            FetchResponseMetricAggregator fetchResponseMetricAggregator = new FetchResponseMetricAggregator(Fetcher.this.sensors, new HashSet(responseData.keySet()));
                            for (Map.Entry<TopicPartition, FetchResponseData.PartitionData> entry2 : responseData.entrySet()) {
                                TopicPartition key2 = entry2.getKey();
                                FetchRequest.PartitionData partitionData = value.sessionPartitions().get(key2);
                                if (partitionData == null) {
                                    throw new IllegalStateException(value.metadata().isFull() ? MessageFormatter.arrayFormat("Response for missing full request partition: partition={}; metadata={}", new Object[]{key2, value.metadata()}).getMessage() : MessageFormatter.arrayFormat("Response for missing session request partition: partition={}; metadata={}; toSend={}; toForget={}; toReplace={}", new Object[]{key2, value.metadata(), value.toSend(), value.toForget(), value.toReplace()}).getMessage());
                                }
                                long j = partitionData.fetchOffset;
                                FetchResponseData.PartitionData value2 = entry2.getValue();
                                Fetcher.this.log.debug("Fetch {} at offset {} for partition {} returned fetch data {}", new Object[]{Fetcher.this.isolationLevel, Long.valueOf(j), key2, value2});
                                Fetcher.this.completedFetches.add(new CompletedFetch(key2, value2, fetchResponseMetricAggregator, FetchResponse.recordsOrFail(value2).batches().iterator(), Long.valueOf(j), clientResponse.requestHeader().apiVersion()));
                            }
                            Fetcher.this.sensors.fetchLatency.record(clientResponse.requestLatencyMs());
                            Fetcher.this.nodesWithPendingFetchRequests.remove(Integer.valueOf(key.id()));
                        } finally {
                            Fetcher.this.nodesWithPendingFetchRequests.remove(Integer.valueOf(key.id()));
                        }
                    }
                }

                /* JADX WARN: Finally extract failed */
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    synchronized (Fetcher.this) {
                        try {
                            FetchSessionHandler sessionHandler = Fetcher.this.sessionHandler(key.id());
                            if (sessionHandler != null) {
                                sessionHandler.handleError(runtimeException);
                            }
                            Fetcher.this.nodesWithPendingFetchRequests.remove(Integer.valueOf(key.id()));
                        } catch (Throwable th) {
                            Fetcher.this.nodesWithPendingFetchRequests.remove(Integer.valueOf(key.id()));
                            throw th;
                        }
                    }
                }
            });
        }
        return prepareFetchRequests.size();
    }

    public Map<String, List<PartitionInfo>> getAllTopicMetadata(Timer timer) {
        return getTopicMetadata(MetadataRequest.Builder.allTopics(), timer);
    }

    public Map<String, List<PartitionInfo>> getTopicMetadata(MetadataRequest.Builder builder, Timer timer) {
        if (!builder.isAllTopics() && builder.emptyTopicList()) {
            return Collections.emptyMap();
        }
        do {
            RequestFuture<ClientResponse> sendMetadataRequest = sendMetadataRequest(builder);
            this.client.poll(sendMetadataRequest, timer);
            if (sendMetadataRequest.failed() && !sendMetadataRequest.isRetriable()) {
                throw sendMetadataRequest.exception();
            }
            if (sendMetadataRequest.succeeded()) {
                MetadataResponse metadataResponse = (MetadataResponse) sendMetadataRequest.value().responseBody();
                Cluster buildCluster = metadataResponse.buildCluster();
                Set<String> unauthorizedTopics = buildCluster.unauthorizedTopics();
                if (!unauthorizedTopics.isEmpty()) {
                    throw new TopicAuthorizationException(unauthorizedTopics);
                }
                boolean z = false;
                Map<String, Errors> errors = metadataResponse.errors();
                if (!errors.isEmpty()) {
                    this.log.debug("Topic metadata fetch included errors: {}", errors);
                    for (Map.Entry<String, Errors> entry : errors.entrySet()) {
                        String key = entry.getKey();
                        Errors value = entry.getValue();
                        if (value == Errors.INVALID_TOPIC_EXCEPTION) {
                            throw new InvalidTopicException("Topic '" + key + "' is invalid");
                        }
                        if (value != Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                            if (!(value.exception() instanceof RetriableException)) {
                                throw new KafkaException("Unexpected error fetching metadata for topic " + key, value.exception());
                            }
                            z = true;
                        }
                    }
                }
                if (!z) {
                    HashMap hashMap = new HashMap();
                    for (String str : buildCluster.topics()) {
                        hashMap.put(str, buildCluster.partitionsForTopic(str));
                    }
                    return hashMap;
                }
            }
            timer.sleep(this.retryBackoffMs);
        } while (timer.notExpired());
        throw new TimeoutException("Timeout expired while fetching topic metadata");
    }

    private RequestFuture<ClientResponse> sendMetadataRequest(MetadataRequest.Builder builder) {
        Node leastLoadedNode = this.client.leastLoadedNode();
        return leastLoadedNode == null ? RequestFuture.noBrokersAvailable() : this.client.send(leastLoadedNode, builder);
    }

    private Long offsetResetStrategyTimestamp(TopicPartition topicPartition) {
        OffsetResetStrategy resetStrategy = this.subscriptions.resetStrategy(topicPartition);
        if (resetStrategy == OffsetResetStrategy.EARLIEST) {
            return -2L;
        }
        return resetStrategy == OffsetResetStrategy.LATEST ? -1L : null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OffsetResetStrategy timestampToOffsetResetStrategy(long j) {
        if (j == -2) {
            return OffsetResetStrategy.EARLIEST;
        }
        if (j == -1) {
            return OffsetResetStrategy.LATEST;
        }
        return null;
    }

    public void resetOffsetsIfNeeded() {
        RuntimeException andSet = this.cachedListOffsetsException.getAndSet(null);
        if (andSet != null) {
            throw andSet;
        }
        Set<TopicPartition> partitionsNeedingReset = this.subscriptions.partitionsNeedingReset(this.time.milliseconds());
        if (partitionsNeedingReset.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : partitionsNeedingReset) {
            Long offsetResetStrategyTimestamp = offsetResetStrategyTimestamp(topicPartition);
            if (offsetResetStrategyTimestamp != null) {
                hashMap.put(topicPartition, offsetResetStrategyTimestamp);
            }
        }
        resetOffsetsAsync(hashMap);
    }

    public void validateOffsetsIfNeeded() {
        RuntimeException andSet = this.cachedOffsetForLeaderException.getAndSet(null);
        if (andSet != null) {
            throw andSet;
        }
        validatePositionsOnMetadataChange();
        Stream<TopicPartition> filter = this.subscriptions.partitionsNeedingValidation(this.time.milliseconds()).stream().filter(topicPartition -> {
            return this.subscriptions.position(topicPartition) != null;
        });
        Function identity = Function.identity();
        SubscriptionState subscriptionState = this.subscriptions;
        Objects.requireNonNull(subscriptionState);
        validateOffsetsAsync((Map) filter.collect(Collectors.toMap(identity, subscriptionState::position)));
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Timer timer) {
        this.metadata.addTransientTopics(topicsForPartitions(map.keySet()));
        try {
            Map map2 = fetchOffsetsByTimes(map, timer, true).fetchedOffsets;
            HashMap hashMap = new HashMap(map.size());
            Iterator<Map.Entry<TopicPartition, Long>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next().getKey(), null);
            }
            for (Map.Entry<K, V> entry : map2.entrySet()) {
                ListOffsetData listOffsetData = (ListOffsetData) entry.getValue();
                hashMap.put((TopicPartition) entry.getKey(), new OffsetAndTimestamp(listOffsetData.offset, listOffsetData.timestamp.longValue(), listOffsetData.leaderEpoch));
            }
            return hashMap;
        } finally {
            this.metadata.clearTransientTopics();
        }
    }

    private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> map, Timer timer, boolean z) {
        final ListOffsetResult listOffsetResult = new ListOffsetResult();
        if (map.isEmpty()) {
            return listOffsetResult;
        }
        final HashMap hashMap = new HashMap(map);
        do {
            final RequestFuture<ListOffsetResult> sendListOffsetsRequests = sendListOffsetsRequests(hashMap, z);
            sendListOffsetsRequests.addListener(new RequestFutureListener<ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.2
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ListOffsetResult listOffsetResult2) {
                    synchronized (sendListOffsetsRequests) {
                        listOffsetResult.fetchedOffsets.putAll(listOffsetResult2.fetchedOffsets);
                        hashMap.keySet().retainAll(listOffsetResult2.partitionsToRetry);
                        for (Map.Entry<K, V> entry : listOffsetResult2.fetchedOffsets.entrySet()) {
                            TopicPartition topicPartition = (TopicPartition) entry.getKey();
                            if (Fetcher.this.subscriptions.isAssigned(topicPartition)) {
                                long j = ((ListOffsetData) entry.getValue()).offset;
                                if (Fetcher.this.isolationLevel == IsolationLevel.READ_COMMITTED) {
                                    Fetcher.this.log.trace("Updating last stable offset for partition {} to {}", topicPartition, Long.valueOf(j));
                                    Fetcher.this.subscriptions.updateLastStableOffset(topicPartition, j);
                                } else {
                                    Fetcher.this.log.trace("Updating high watermark for partition {} to {}", topicPartition, Long.valueOf(j));
                                    Fetcher.this.subscriptions.updateHighWatermark(topicPartition, j);
                                }
                            }
                        }
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    if (!(runtimeException instanceof RetriableException)) {
                        throw sendListOffsetsRequests.exception();
                    }
                }
            });
            if (timer.timeoutMs() != 0) {
                this.client.poll(sendListOffsetsRequests, timer);
                if (!sendListOffsetsRequests.isDone()) {
                    break;
                }
                if (hashMap.isEmpty()) {
                    return listOffsetResult;
                }
                this.client.awaitMetadataUpdate(timer);
            } else {
                return listOffsetResult;
            }
        } while (timer.notExpired());
        throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + "ms");
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Timer timer) {
        return beginningOrEndOffset(collection, -2L, timer);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Timer timer) {
        return beginningOrEndOffset(collection, -1L, timer);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> collection, long j, Timer timer) {
        this.metadata.addTransientTopics(topicsForPartitions(collection));
        try {
            Map<TopicPartition, Long> map = (Map) fetchOffsetsByTimes((Map) collection.stream().distinct().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                return Long.valueOf(j);
            })), timer, false).fetchedOffsets.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return Long.valueOf(((ListOffsetData) entry.getValue()).offset);
            }));
            this.metadata.clearTransientTopics();
            return map;
        } catch (Throwable th) {
            this.metadata.clearTransientTopics();
            throw th;
        }
    }

    public Fetch<K, V> collectFetch() {
        Fetch<K, V> empty = Fetch.empty();
        ArrayDeque arrayDeque = new ArrayDeque();
        int i = this.maxPollRecords;
        while (i > 0) {
            try {
                try {
                    if (this.nextInLineFetch == null || ((CompletedFetch) this.nextInLineFetch).isConsumed) {
                        Fetcher<K, V>.CompletedFetch peek = this.completedFetches.peek();
                        if (peek == null) {
                            break;
                        }
                        if (peek.notInitialized()) {
                            try {
                                this.nextInLineFetch = initializeCompletedFetch(peek);
                            } catch (Exception e) {
                                FetchResponseData.PartitionData partitionData = ((CompletedFetch) peek).partitionData;
                                if (empty.isEmpty() && FetchResponse.recordsOrFail(partitionData).sizeInBytes() == 0) {
                                    this.completedFetches.poll();
                                }
                                throw e;
                            }
                        } else {
                            this.nextInLineFetch = peek;
                        }
                        this.completedFetches.poll();
                    } else if (this.subscriptions.isPaused(((CompletedFetch) this.nextInLineFetch).partition)) {
                        this.log.debug("Skipping fetching records for assigned partition {} because it is paused", ((CompletedFetch) this.nextInLineFetch).partition);
                        arrayDeque.add(this.nextInLineFetch);
                        this.nextInLineFetch = null;
                    } else {
                        Fetch<K, V> fetchRecords = fetchRecords(this.nextInLineFetch, i);
                        i -= fetchRecords.numRecords();
                        empty.add(fetchRecords);
                    }
                } catch (KafkaException e2) {
                    if (empty.isEmpty()) {
                        throw e2;
                    }
                    this.completedFetches.addAll(arrayDeque);
                }
            } catch (Throwable th) {
                this.completedFetches.addAll(arrayDeque);
                throw th;
            }
        }
        this.completedFetches.addAll(arrayDeque);
        return empty;
    }

    private Fetch<K, V> fetchRecords(Fetcher<K, V>.CompletedFetch completedFetch, int i) {
        if (!this.subscriptions.isAssigned(((CompletedFetch) completedFetch).partition)) {
            this.log.debug("Not returning fetched records for partition {} since it is no longer assigned", ((CompletedFetch) completedFetch).partition);
        } else if (this.subscriptions.isFetchable(((CompletedFetch) completedFetch).partition)) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(((CompletedFetch) completedFetch).partition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + ((CompletedFetch) completedFetch).partition);
            }
            if (((CompletedFetch) completedFetch).nextFetchOffset == position.offset) {
                List fetchRecords = completedFetch.fetchRecords(i);
                this.log.trace("Returning {} fetched records at offset {} for assigned partition {}", new Object[]{Integer.valueOf(fetchRecords.size()), position, ((CompletedFetch) completedFetch).partition});
                boolean z = false;
                if (((CompletedFetch) completedFetch).nextFetchOffset > position.offset) {
                    SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(((CompletedFetch) completedFetch).nextFetchOffset, ((CompletedFetch) completedFetch).lastEpoch, position.currentLeader);
                    this.log.trace("Updating fetch position from {} to {} for partition {} and returning {} records from `poll()`", new Object[]{position, fetchPosition, ((CompletedFetch) completedFetch).partition, Integer.valueOf(fetchRecords.size())});
                    this.subscriptions.position(((CompletedFetch) completedFetch).partition, fetchPosition);
                    z = true;
                }
                Long partitionLag = this.subscriptions.partitionLag(((CompletedFetch) completedFetch).partition, this.isolationLevel);
                if (partitionLag != null) {
                    this.sensors.recordPartitionLag(((CompletedFetch) completedFetch).partition, partitionLag.longValue());
                }
                Long partitionLead = this.subscriptions.partitionLead(((CompletedFetch) completedFetch).partition);
                if (partitionLead != null) {
                    this.sensors.recordPartitionLead(((CompletedFetch) completedFetch).partition, partitionLead.longValue());
                }
                return Fetch.forPartition(((CompletedFetch) completedFetch).partition, fetchRecords, z);
            }
            this.log.debug("Ignoring fetched records for {} at offset {} since the current position is {}", new Object[]{((CompletedFetch) completedFetch).partition, Long.valueOf(((CompletedFetch) completedFetch).nextFetchOffset), position});
        } else {
            this.log.debug("Not returning fetched records for assigned partition {} since it is no longer fetchable", ((CompletedFetch) completedFetch).partition);
        }
        this.log.trace("Draining fetched records for partition {}", ((CompletedFetch) completedFetch).partition);
        completedFetch.drain();
        return Fetch.empty();
    }

    void resetOffsetIfNeeded(TopicPartition topicPartition, OffsetResetStrategy offsetResetStrategy, ListOffsetData listOffsetData) {
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(listOffsetData.offset, Optional.empty(), this.metadata.currentLeader(topicPartition));
        listOffsetData.leaderEpoch.ifPresent(num -> {
            this.metadata.updateLastSeenEpochIfNewer(topicPartition, num.intValue());
        });
        this.subscriptions.maybeSeekUnvalidated(topicPartition, fetchPosition, offsetResetStrategy);
    }

    private void resetOffsetsAsync(Map<TopicPartition, Long> map) {
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : groupListOffsetRequests(map, new HashSet()).entrySet()) {
            Node key = entry.getKey();
            final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> value = entry.getValue();
            this.subscriptions.setNextAllowedRetry(value.keySet(), this.time.milliseconds() + this.requestTimeoutMs);
            sendListOffsetRequest(key, value, false).addListener(new RequestFutureListener<ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.3
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ListOffsetResult listOffsetResult) {
                    if (!listOffsetResult.partitionsToRetry.isEmpty()) {
                        Fetcher.this.subscriptions.requestFailed(listOffsetResult.partitionsToRetry, Fetcher.this.time.milliseconds() + Fetcher.this.retryBackoffMs);
                        Fetcher.this.metadata.requestUpdate();
                    }
                    for (Map.Entry<K, V> entry2 : listOffsetResult.fetchedOffsets.entrySet()) {
                        TopicPartition topicPartition = (TopicPartition) entry2.getKey();
                        Fetcher.this.resetOffsetIfNeeded(topicPartition, Fetcher.this.timestampToOffsetResetStrategy(((ListOffsetsRequestData.ListOffsetsPartition) value.get(topicPartition)).timestamp()), (ListOffsetData) entry2.getValue());
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    Fetcher.this.subscriptions.requestFailed(value.keySet(), Fetcher.this.time.milliseconds() + Fetcher.this.retryBackoffMs);
                    Fetcher.this.metadata.requestUpdate();
                    if ((runtimeException instanceof RetriableException) || Fetcher.this.cachedListOffsetsException.compareAndSet(null, runtimeException)) {
                        return;
                    }
                    Fetcher.this.log.error("Discarding error in ListOffsetResponse because another error is pending", runtimeException);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
        ApiVersionsResponseData.ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
        if (apiVersion == null) {
            return false;
        }
        return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
    }

    private void validateOffsetsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> map) {
        Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader = regroupFetchPositionsByLeader(map);
        long milliseconds = this.time.milliseconds() + this.requestTimeoutMs;
        regroupFetchPositionsByLeader.forEach((node, map2) -> {
            if (node.isEmpty()) {
                this.metadata.requestUpdate();
                return;
            }
            NodeApiVersions nodeApiVersions = this.apiVersions.get(node.idString());
            if (nodeApiVersions == null) {
                this.client.tryConnect(node);
                return;
            }
            if (hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                this.subscriptions.setNextAllowedRetry(map2.keySet(), milliseconds);
                this.offsetsForLeaderEpochClient.sendAsyncRequest(node, map2).addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.4
                    @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                    public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetForEpochResult) {
                        ArrayList arrayList = new ArrayList();
                        if (!offsetForEpochResult.partitionsToRetry().isEmpty()) {
                            Fetcher.this.subscriptions.setNextAllowedRetry(offsetForEpochResult.partitionsToRetry(), Fetcher.this.time.milliseconds() + Fetcher.this.retryBackoffMs);
                            Fetcher.this.metadata.requestUpdate();
                        }
                        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> endOffsets = offsetForEpochResult.endOffsets();
                        Map map2 = map2;
                        endOffsets.forEach((topicPartition, epochEndOffset) -> {
                            Optional<SubscriptionState.LogTruncation> maybeCompleteValidation = Fetcher.this.subscriptions.maybeCompleteValidation(topicPartition, (SubscriptionState.FetchPosition) map2.get(topicPartition), epochEndOffset);
                            Objects.requireNonNull(arrayList);
                            maybeCompleteValidation.ifPresent((v1) -> {
                                r1.add(v1);
                            });
                        });
                        if (arrayList.isEmpty()) {
                            return;
                        }
                        Fetcher.this.maybeSetOffsetForLeaderException(Fetcher.this.buildLogTruncationException(arrayList));
                    }

                    @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                    public void onFailure(RuntimeException runtimeException) {
                        Fetcher.this.subscriptions.requestFailed(map2.keySet(), Fetcher.this.time.milliseconds() + Fetcher.this.retryBackoffMs);
                        Fetcher.this.metadata.requestUpdate();
                        if (runtimeException instanceof RetriableException) {
                            return;
                        }
                        Fetcher.this.maybeSetOffsetForLeaderException(runtimeException);
                    }
                });
                return;
            }
            this.log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not support the required protocol version (introduced in Kafka 2.3)", map2.keySet());
            Iterator<K> it = map2.keySet().iterator();
            while (it.hasNext()) {
                this.subscriptions.completeValidation((TopicPartition) it.next());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (SubscriptionState.LogTruncation logTruncation : list) {
            logTruncation.divergentOffsetOpt.ifPresent(offsetAndMetadata -> {
                hashMap.put(logTruncation.topicPartition, offsetAndMetadata);
            });
            hashMap2.put(logTruncation.topicPartition, Long.valueOf(logTruncation.fetchPosition.offset));
        }
        return new LogTruncationException("Detected truncated partitions: " + list, hashMap2, hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void maybeSetOffsetForLeaderException(RuntimeException runtimeException) {
        if (this.cachedOffsetForLeaderException.compareAndSet(null, runtimeException)) {
            return;
        }
        this.log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", runtimeException);
    }

    private RequestFuture<ListOffsetResult> sendListOffsetsRequests(Map<TopicPartition, Long> map, boolean z) {
        final HashSet hashSet = new HashSet();
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests = groupListOffsetRequests(map, hashSet);
        if (groupListOffsetRequests.isEmpty()) {
            return RequestFuture.failure(new StaleMetadataException());
        }
        final RequestFuture<ListOffsetResult> requestFuture = new RequestFuture<>();
        final HashMap hashMap = new HashMap();
        final AtomicInteger atomicInteger = new AtomicInteger(groupListOffsetRequests.size());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : groupListOffsetRequests.entrySet()) {
            sendListOffsetRequest(entry.getKey(), entry.getValue(), z).addListener(new RequestFutureListener<ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.5
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ListOffsetResult listOffsetResult) {
                    synchronized (requestFuture) {
                        hashMap.putAll(listOffsetResult.fetchedOffsets);
                        hashSet.addAll(listOffsetResult.partitionsToRetry);
                        if (atomicInteger.decrementAndGet() == 0 && !requestFuture.isDone()) {
                            requestFuture.complete(new ListOffsetResult(hashMap, hashSet));
                        }
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    synchronized (requestFuture) {
                        if (!requestFuture.isDone()) {
                            requestFuture.raise(runtimeException);
                        }
                    }
                }
            });
        }
        return requestFuture;
    }

    private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests(Map<TopicPartition, Long> map, Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            Long value = entry.getValue();
            Metadata.LeaderAndEpoch currentLeader = this.metadata.currentLeader(key);
            if (currentLeader.leader.isPresent()) {
                Node node = currentLeader.leader.get();
                if (this.client.isUnavailable(node)) {
                    this.client.maybeThrowAuthFailure(node);
                    this.log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires", node, key);
                    set.add(key);
                } else {
                    hashMap.put(key, new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(key.partition()).setTimestamp(value.longValue()).setCurrentLeaderEpoch(currentLeader.epoch.orElse(-1).intValue()));
                }
            } else {
                this.log.debug("Leader for partition {} is unknown for fetching offset {}", key, value);
                this.metadata.requestUpdate();
                set.add(key);
            }
        }
        return regroupPartitionMapByNode(hashMap);
    }

    private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> map, boolean z) {
        ListOffsetsRequest.Builder targetTimes = ListOffsetsRequest.Builder.forConsumer(z, this.isolationLevel, false).setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(map));
        this.log.debug("Sending ListOffsetRequest {} to broker {}", targetTimes, node);
        return this.client.send(node, targetTimes).compose(new RequestFutureAdapter<ClientResponse, ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.Fetcher.6
            @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
            public void onSuccess(ClientResponse clientResponse, RequestFuture<ListOffsetResult> requestFuture) {
                ListOffsetsResponse listOffsetsResponse = (ListOffsetsResponse) clientResponse.responseBody();
                Fetcher.this.log.trace("Received ListOffsetResponse {} from broker {}", listOffsetsResponse, node);
                Fetcher.this.handleListOffsetResponse(listOffsetsResponse, requestFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse, RequestFuture<ListOffsetResult> requestFuture) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (ListOffsetsResponseData.ListOffsetsTopicResponse listOffsetsTopicResponse : listOffsetsResponse.topics()) {
            for (ListOffsetsResponseData.ListOffsetsPartitionResponse listOffsetsPartitionResponse : listOffsetsTopicResponse.partitions()) {
                TopicPartition topicPartition = new TopicPartition(listOffsetsTopicResponse.name(), listOffsetsPartitionResponse.partitionIndex());
                Errors forCode = Errors.forCode(listOffsetsPartitionResponse.errorCode());
                switch (AnonymousClass7.$SwitchMap$org$apache$kafka$common$protocol$Errors[forCode.ordinal()]) {
                    case 1:
                        if (listOffsetsPartitionResponse.oldStyleOffsets().isEmpty()) {
                            this.log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", new Object[]{topicPartition, Long.valueOf(listOffsetsPartitionResponse.offset()), Long.valueOf(listOffsetsPartitionResponse.timestamp())});
                            if (listOffsetsPartitionResponse.offset() == -1) {
                                break;
                            } else {
                                hashMap.put(topicPartition, new ListOffsetData(listOffsetsPartitionResponse.offset(), Long.valueOf(listOffsetsPartitionResponse.timestamp()), listOffsetsPartitionResponse.leaderEpoch() == -1 ? Optional.empty() : Optional.of(Integer.valueOf(listOffsetsPartitionResponse.leaderEpoch()))));
                                break;
                            }
                        } else {
                            if (listOffsetsPartitionResponse.oldStyleOffsets().size() > 1) {
                                requestFuture.raise(new IllegalStateException("Unexpected partitionData response of length " + listOffsetsPartitionResponse.oldStyleOffsets().size()));
                                return;
                            }
                            long longValue = listOffsetsPartitionResponse.oldStyleOffsets().get(0).longValue();
                            this.log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", topicPartition, Long.valueOf(longValue));
                            if (longValue != -1) {
                                hashMap.put(topicPartition, new ListOffsetData(longValue, null, Optional.empty()));
                                break;
                            } else {
                                break;
                            }
                        }
                    case 2:
                        this.log.debug("Cannot search by timestamp for partition {} because the message format version is before 0.10.0", topicPartition);
                        break;
                    case 3:
                    case 4:
                    case 5:
                    case 6:
                    case 7:
                    case 8:
                    case 9:
                        this.log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", topicPartition, forCode);
                        hashSet.add(topicPartition);
                        break;
                    case LegacyRecord.KEY_OFFSET_V0 /* 10 */:
                        this.log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
                        hashSet.add(topicPartition);
                        break;
                    case 11:
                        hashSet2.add(topicPartition.topic());
                        break;
                    default:
                        this.log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.", topicPartition, forCode.message());
                        hashSet.add(topicPartition);
                        break;
                }
            }
        }
        if (hashSet2.isEmpty()) {
            requestFuture.complete(new ListOffsetResult(hashMap, hashSet));
        } else {
            requestFuture.raise(new TopicAuthorizationException(hashSet2));
        }
    }

    private List<TopicPartition> fetchablePartitions() {
        HashSet hashSet = new HashSet();
        if (this.nextInLineFetch != null && !((CompletedFetch) this.nextInLineFetch).isConsumed) {
            hashSet.add(((CompletedFetch) this.nextInLineFetch).partition);
        }
        Iterator<Fetcher<K, V>.CompletedFetch> it = this.completedFetches.iterator();
        while (it.hasNext()) {
            hashSet.add(((CompletedFetch) it.next()).partition);
        }
        return this.subscriptions.fetchablePartitions(topicPartition -> {
            return !hashSet.contains(topicPartition);
        });
    }

    Node selectReadReplica(TopicPartition topicPartition, Node node, long j) {
        Optional<Integer> preferredReadReplica = this.subscriptions.preferredReadReplica(topicPartition, j);
        if (!preferredReadReplica.isPresent()) {
            return node;
        }
        Optional<U> flatMap = preferredReadReplica.flatMap(num -> {
            return this.metadata.fetch().nodeIfOnline(topicPartition, num.intValue());
        });
        if (flatMap.isPresent()) {
            return (Node) flatMap.get();
        }
        this.log.trace("Not fetching from {} for partition {} since it is marked offline or is missing from our metadata, using the leader instead.", preferredReadReplica, topicPartition);
        this.subscriptions.clearPreferredReadReplica(topicPartition);
        return node;
    }

    private void validatePositionsOnMetadataChange() {
        int updateVersion = this.metadata.updateVersion();
        if (this.metadataUpdateVersion.getAndSet(updateVersion) != updateVersion) {
            this.subscriptions.assignedPartitions().forEach(topicPartition -> {
                this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, topicPartition, this.metadata.currentLeader(topicPartition));
            });
        }
    }

    private Map<Node, FetchSessionHandler.FetchRequestData> prepareFetchRequests() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        validatePositionsOnMetadataChange();
        long milliseconds = this.time.milliseconds();
        Map<String, Uuid> map = this.metadata.topicIds();
        for (TopicPartition topicPartition : fetchablePartitions()) {
            SubscriptionState.FetchPosition position = this.subscriptions.position(topicPartition);
            if (position == null) {
                throw new IllegalStateException("Missing position for fetchable partition " + topicPartition);
            }
            Optional<Node> optional = position.currentLeader.leader;
            if (optional.isPresent()) {
                Node selectReadReplica = selectReadReplica(topicPartition, optional.get(), milliseconds);
                if (this.client.isUnavailable(selectReadReplica)) {
                    this.client.maybeThrowAuthFailure(selectReadReplica);
                    this.log.trace("Skipping fetch for partition {} because node {} is awaiting reconnect backoff", topicPartition, selectReadReplica);
                } else if (this.nodesWithPendingFetchRequests.contains(Integer.valueOf(selectReadReplica.id()))) {
                    this.log.trace("Skipping fetch for partition {} because previous request to {} has not been processed", topicPartition, selectReadReplica);
                } else {
                    FetchSessionHandler.Builder builder = (FetchSessionHandler.Builder) linkedHashMap.get(selectReadReplica);
                    if (builder == null) {
                        int id = selectReadReplica.id();
                        FetchSessionHandler sessionHandler = sessionHandler(id);
                        if (sessionHandler == null) {
                            sessionHandler = new FetchSessionHandler(this.logContext, id);
                            this.sessionHandlers.put(Integer.valueOf(id), sessionHandler);
                        }
                        builder = sessionHandler.newBuilder();
                        linkedHashMap.put(selectReadReplica, builder);
                    }
                    builder.add(topicPartition, new FetchRequest.PartitionData(map.getOrDefault(topicPartition.topic(), Uuid.ZERO_UUID), position.offset, -1L, this.fetchSize, position.currentLeader.epoch, Optional.empty()));
                    this.log.debug("Added {} fetch request for partition {} at position {} to node {}", new Object[]{this.isolationLevel, topicPartition, position, selectReadReplica});
                }
            } else {
                this.log.debug("Requesting metadata update for partition {} since the position {} is missing the current leader node", topicPartition, position);
                this.metadata.requestUpdate();
            }
        }
        LinkedHashMap linkedHashMap2 = new LinkedHashMap();
        for (Map.Entry<K, V> entry : linkedHashMap.entrySet()) {
            linkedHashMap2.put((Node) entry.getKey(), ((FetchSessionHandler.Builder) entry.getValue()).build());
        }
        return linkedHashMap2;
    }

    private Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader(Map<TopicPartition, SubscriptionState.FetchPosition> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return ((SubscriptionState.FetchPosition) entry.getValue()).currentLeader.leader.isPresent();
        }).collect(Collectors.groupingBy(entry2 -> {
            return ((SubscriptionState.FetchPosition) entry2.getValue()).currentLeader.leader.get();
        }, Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private <T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(Map<TopicPartition, T> map) {
        return (Map) map.entrySet().stream().collect(Collectors.groupingBy(entry -> {
            return this.metadata.fetch().leaderFor((TopicPartition) entry.getKey());
        }, Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private Fetcher<K, V>.CompletedFetch initializeCompletedFetch(Fetcher<K, V>.CompletedFetch completedFetch) {
        TopicPartition topicPartition = ((CompletedFetch) completedFetch).partition;
        FetchResponseData.PartitionData partitionData = ((CompletedFetch) completedFetch).partitionData;
        long j = ((CompletedFetch) completedFetch).nextFetchOffset;
        Fetcher<K, V>.CompletedFetch completedFetch2 = null;
        Errors forCode = Errors.forCode(partitionData.errorCode());
        try {
            if (!this.subscriptions.hasValidPosition(topicPartition)) {
                this.log.debug("Ignoring fetched records for partition {} since it no longer has valid position", topicPartition);
            } else if (forCode == Errors.NONE) {
                SubscriptionState.FetchPosition position = this.subscriptions.position(topicPartition);
                if (position == null || position.offset != j) {
                    this.log.debug("Discarding stale fetch response for partition {} since its offset {} does not match the expected offset {}", new Object[]{topicPartition, Long.valueOf(j), position});
                    if (0 == 0) {
                        ((CompletedFetch) completedFetch).metricAggregator.record(topicPartition, 0, 0);
                    }
                    if (forCode != Errors.NONE) {
                        this.subscriptions.movePartitionToEnd(topicPartition);
                    }
                    return null;
                }
                this.log.trace("Preparing to read {} bytes of data for partition {} with offset {}", new Object[]{Integer.valueOf(FetchResponse.recordsSize(partitionData)), topicPartition, position});
                completedFetch2 = completedFetch;
                if (!FetchResponse.recordsOrFail(partitionData).batches().iterator().hasNext() && FetchResponse.recordsSize(partitionData) > 0) {
                    if (((CompletedFetch) completedFetch2).responseVersion >= 3) {
                        throw new KafkaException("Failed to make progress reading messages at " + topicPartition + "=" + j + ". Received a non-empty fetch response from the server, but no complete records were found.");
                    }
                    Map singletonMap = Collections.singletonMap(topicPartition, Long.valueOf(j));
                    throw new RecordTooLargeException("There are some messages at [Partition=Offset]: " + singletonMap + " whose size is larger than the fetch size " + this.fetchSize + " and hence cannot be returned. Please considering upgrading your broker to 0.10.1.0 or newer to avoid this issue. Alternately, increase the fetch size on the client (using " + ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG + ")", (Map<TopicPartition, Long>) singletonMap);
                }
                if (partitionData.highWatermark() >= 0) {
                    this.log.trace("Updating high watermark for partition {} to {}", topicPartition, Long.valueOf(partitionData.highWatermark()));
                    this.subscriptions.updateHighWatermark(topicPartition, partitionData.highWatermark());
                }
                if (partitionData.logStartOffset() >= 0) {
                    this.log.trace("Updating log start offset for partition {} to {}", topicPartition, Long.valueOf(partitionData.logStartOffset()));
                    this.subscriptions.updateLogStartOffset(topicPartition, partitionData.logStartOffset());
                }
                if (partitionData.lastStableOffset() >= 0) {
                    this.log.trace("Updating last stable offset for partition {} to {}", topicPartition, Long.valueOf(partitionData.lastStableOffset()));
                    this.subscriptions.updateLastStableOffset(topicPartition, partitionData.lastStableOffset());
                }
                if (FetchResponse.isPreferredReplica(partitionData)) {
                    this.subscriptions.updatePreferredReadReplica(((CompletedFetch) completedFetch2).partition, partitionData.preferredReadReplica(), () -> {
                        long milliseconds = this.time.milliseconds() + this.metadata.metadataExpireMs();
                        this.log.debug("Updating preferred read replica for partition {} to {}, set to expire at {}", new Object[]{topicPartition, Integer.valueOf(partitionData.preferredReadReplica()), Long.valueOf(milliseconds)});
                        return milliseconds;
                    });
                }
                ((CompletedFetch) completedFetch).initialized = true;
            } else if (forCode == Errors.NOT_LEADER_OR_FOLLOWER || forCode == Errors.REPLICA_NOT_AVAILABLE || forCode == Errors.KAFKA_STORAGE_ERROR || forCode == Errors.FENCED_LEADER_EPOCH || forCode == Errors.OFFSET_NOT_AVAILABLE) {
                this.log.debug("Error in fetch for partition {}: {}", topicPartition, forCode.exceptionName());
                this.metadata.requestUpdate();
            } else if (forCode == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                this.log.warn("Received unknown topic or partition error in fetch for partition {}", topicPartition);
                this.metadata.requestUpdate();
            } else if (forCode == Errors.UNKNOWN_TOPIC_ID) {
                this.log.warn("Received unknown topic ID error in fetch for partition {}", topicPartition);
                this.metadata.requestUpdate();
            } else if (forCode == Errors.INCONSISTENT_TOPIC_ID) {
                this.log.warn("Received inconsistent topic ID error in fetch for partition {}", topicPartition);
                this.metadata.requestUpdate();
            } else if (forCode == Errors.OFFSET_OUT_OF_RANGE) {
                Optional<Integer> clearPreferredReadReplica = this.subscriptions.clearPreferredReadReplica(topicPartition);
                if (clearPreferredReadReplica.isPresent()) {
                    this.log.debug("Unset the preferred read replica {} for partition {} since we got {} when fetching {}", new Object[]{clearPreferredReadReplica.get(), topicPartition, forCode, Long.valueOf(j)});
                } else {
                    SubscriptionState.FetchPosition position2 = this.subscriptions.position(topicPartition);
                    if (position2 == null || j != position2.offset) {
                        this.log.debug("Discarding stale fetch response for partition {} since the fetched offset {} does not match the current offset {}", new Object[]{topicPartition, Long.valueOf(j), position2});
                    } else {
                        handleOffsetOutOfRange(position2, topicPartition);
                    }
                }
            } else {
                if (forCode == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    this.log.warn("Not authorized to read from partition {}.", topicPartition);
                    throw new TopicAuthorizationException((Set<String>) Collections.singleton(topicPartition.topic()));
                }
                if (forCode == Errors.UNKNOWN_LEADER_EPOCH) {
                    this.log.debug("Received unknown leader epoch error in fetch for partition {}", topicPartition);
                } else {
                    if (forCode != Errors.UNKNOWN_SERVER_ERROR) {
                        if (forCode == Errors.CORRUPT_MESSAGE) {
                            throw new KafkaException("Encountered corrupt message when fetching offset " + j + " for topic-partition " + topicPartition);
                        }
                        throw new IllegalStateException("Unexpected error code " + ((int) forCode.code()) + " while fetching at offset " + j + " from topic-partition " + topicPartition);
                    }
                    this.log.warn("Unknown server error while fetching offset {} for topic-partition {}", Long.valueOf(j), topicPartition);
                }
            }
            completedFetch2 = completedFetch2;
            return completedFetch2;
        } finally {
            if (0 == 0) {
                ((CompletedFetch) completedFetch).metricAggregator.record(topicPartition, 0, 0);
            }
            if (forCode != Errors.NONE) {
                this.subscriptions.movePartitionToEnd(topicPartition);
            }
        }
    }

    private void handleOffsetOutOfRange(SubscriptionState.FetchPosition fetchPosition, TopicPartition topicPartition) {
        String str = "Fetch position " + fetchPosition + " is out of range for partition " + topicPartition;
        if (!this.subscriptions.hasDefaultOffsetResetPolicy()) {
            this.log.info("{}, raising error to the application since no reset policy is configured", str);
            throw new OffsetOutOfRangeException(str, Collections.singletonMap(topicPartition, Long.valueOf(fetchPosition.offset)));
        }
        this.log.info("{}, resetting offset", str);
        this.subscriptions.requestOffsetReset(topicPartition);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ConsumerRecord<K, V> parseRecord(TopicPartition topicPartition, RecordBatch recordBatch, Record record) {
        try {
            long offset = record.offset();
            long timestamp = record.timestamp();
            Optional<Integer> maybeLeaderEpoch = maybeLeaderEpoch(recordBatch.partitionLeaderEpoch());
            TimestampType timestampType = recordBatch.timestampType();
            RecordHeaders recordHeaders = new RecordHeaders(record.headers());
            ByteBuffer key = record.key();
            byte[] array = key == null ? null : org.apache.kafka.common.utils.Utils.toArray(key);
            K deserialize = key == null ? null : this.keyDeserializer.deserialize(topicPartition.topic(), recordHeaders, array);
            ByteBuffer value = record.value();
            byte[] array2 = value == null ? null : org.apache.kafka.common.utils.Utils.toArray(value);
            return new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), offset, timestamp, timestampType, array == null ? -1 : array.length, array2 == null ? -1 : array2.length, deserialize, value == null ? null : this.valueDeserializer.deserialize(topicPartition.topic(), recordHeaders, array2), recordHeaders, maybeLeaderEpoch);
        } catch (RuntimeException e) {
            throw new RecordDeserializationException(topicPartition, record.offset(), "Error deserializing key/value for partition " + topicPartition + " at offset " + record.offset() + ". If needed, please seek past the record to continue consumption.", e);
        }
    }

    private Optional<Integer> maybeLeaderEpoch(int i) {
        return i == -1 ? Optional.empty() : Optional.of(Integer.valueOf(i));
    }

    public void clearBufferedDataForUnassignedPartitions(Collection<TopicPartition> collection) {
        Iterator<Fetcher<K, V>.CompletedFetch> it = this.completedFetches.iterator();
        while (it.hasNext()) {
            Fetcher<K, V>.CompletedFetch next = it.next();
            if (!collection.contains(((CompletedFetch) next).partition)) {
                next.drain();
                it.remove();
            }
        }
        if (this.nextInLineFetch == null || collection.contains(((CompletedFetch) this.nextInLineFetch).partition)) {
            return;
        }
        this.nextInLineFetch.drain();
        this.nextInLineFetch = null;
    }

    public void clearBufferedDataForUnassignedTopics(Collection<String> collection) {
        HashSet hashSet = new HashSet();
        for (TopicPartition topicPartition : this.subscriptions.assignedPartitions()) {
            if (collection.contains(topicPartition.topic())) {
                hashSet.add(topicPartition);
            }
        }
        clearBufferedDataForUnassignedPartitions(hashSet);
    }

    protected FetchSessionHandler sessionHandler(int i) {
        return this.sessionHandlers.get(Integer.valueOf(i));
    }

    public static Sensor throttleTimeSensor(Metrics metrics, FetcherMetricsRegistry fetcherMetricsRegistry) {
        Sensor sensor = metrics.sensor("fetch-throttle-time");
        sensor.add(metrics.metricInstance(fetcherMetricsRegistry.fetchThrottleTimeAvg, new String[0]), new Avg());
        sensor.add(metrics.metricInstance(fetcherMetricsRegistry.fetchThrottleTimeMax, new String[0]), new Max());
        return sensor;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.nextInLineFetch != null) {
            this.nextInLineFetch.drain();
        }
        this.decompressionBufferSupplier.close();
    }

    private Set<String> topicsForPartitions(Collection<TopicPartition> collection) {
        return (Set) collection.stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }
}
