package org.apache.kafka.clients.consumer.internals;

import io.micrometer.core.instrument.binder.BaseUnits;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.StaleMetadataException;
import org.apache.kafka.clients.consumer.LogTruncationException;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.ListOffsetsRequestData;
import org.apache.kafka.common.message.ListOffsetsResponseData;
import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Timer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/OffsetFetcher.class */
public class OffsetFetcher {
    private final Logger log;
    private final ConsumerMetadata metadata;
    private final SubscriptionState subscriptions;
    private final ConsumerNetworkClient client;
    private final Time time;
    private final long retryBackoffMs;
    private final long requestTimeoutMs;
    private final IsolationLevel isolationLevel;
    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
    private final ApiVersions apiVersions;
    private final AtomicReference<RuntimeException> cachedListOffsetsException = new AtomicReference<>();
    private final AtomicReference<RuntimeException> cachedOffsetForLeaderException = new AtomicReference<>();
    private final AtomicInteger metadataUpdateVersion = new AtomicInteger(-1);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/OffsetFetcher$ListOffsetData.class */
    public static class ListOffsetData {
        final long offset;
        final Long timestamp;
        final Optional<Integer> leaderEpoch;

        ListOffsetData(long j, Long l, Optional<Integer> optional) {
            this.offset = j;
            this.timestamp = l;
            this.leaderEpoch = optional;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/OffsetFetcher$ListOffsetResult.class */
    public static class ListOffsetResult {
        private final Map<TopicPartition, ListOffsetData> fetchedOffsets;
        private final Set<TopicPartition> partitionsToRetry;

        ListOffsetResult(Map<TopicPartition, ListOffsetData> map, Set<TopicPartition> set) {
            this.fetchedOffsets = map;
            this.partitionsToRetry = set;
        }

        ListOffsetResult() {
            this.fetchedOffsets = new HashMap();
            this.partitionsToRetry = new HashSet();
        }
    }

    public OffsetFetcher(LogContext logContext, ConsumerNetworkClient consumerNetworkClient, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, Time time, long j, long j2, IsolationLevel isolationLevel, ApiVersions apiVersions) {
        this.log = logContext.logger(getClass());
        this.time = time;
        this.client = consumerNetworkClient;
        this.metadata = consumerMetadata;
        this.subscriptions = subscriptionState;
        this.retryBackoffMs = j;
        this.requestTimeoutMs = j2;
        this.isolationLevel = isolationLevel;
        this.apiVersions = apiVersions;
        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(consumerNetworkClient, logContext);
    }

    private Long offsetResetStrategyTimestamp(TopicPartition topicPartition) {
        OffsetResetStrategy resetStrategy = this.subscriptions.resetStrategy(topicPartition);
        if (resetStrategy == OffsetResetStrategy.EARLIEST) {
            return -2L;
        }
        return resetStrategy == OffsetResetStrategy.LATEST ? -1L : null;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public OffsetResetStrategy timestampToOffsetResetStrategy(long j) {
        if (j == -2) {
            return OffsetResetStrategy.EARLIEST;
        }
        if (j == -1) {
            return OffsetResetStrategy.LATEST;
        }
        return null;
    }

    public void resetPositionsIfNeeded() {
        RuntimeException andSet = this.cachedListOffsetsException.getAndSet(null);
        if (andSet != null) {
            throw andSet;
        }
        Set<TopicPartition> partitionsNeedingReset = this.subscriptions.partitionsNeedingReset(this.time.milliseconds());
        if (partitionsNeedingReset.isEmpty()) {
            return;
        }
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : partitionsNeedingReset) {
            Long offsetResetStrategyTimestamp = offsetResetStrategyTimestamp(topicPartition);
            if (offsetResetStrategyTimestamp != null) {
                hashMap.put(topicPartition, offsetResetStrategyTimestamp);
            }
        }
        resetPositionsAsync(hashMap);
    }

    public void validatePositionsIfNeeded() {
        RuntimeException andSet = this.cachedOffsetForLeaderException.getAndSet(null);
        if (andSet != null) {
            throw andSet;
        }
        validatePositionsOnMetadataChange();
        Stream<TopicPartition> filter = this.subscriptions.partitionsNeedingValidation(this.time.milliseconds()).stream().filter(topicPartition -> {
            return this.subscriptions.position(topicPartition) != null;
        });
        Function identity = Function.identity();
        SubscriptionState subscriptionState = this.subscriptions;
        subscriptionState.getClass();
        validatePositionsAsync((Map) filter.collect(Collectors.toMap(identity, subscriptionState::position)));
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Timer timer) {
        this.metadata.addTransientTopics(topicsForPartitions(map.keySet()));
        try {
            Map map2 = fetchOffsetsByTimes(map, timer, true).fetchedOffsets;
            HashMap hashMap = new HashMap(map.size());
            Iterator<Map.Entry<TopicPartition, Long>> it = map.entrySet().iterator();
            while (it.hasNext()) {
                hashMap.put(it.next().getKey(), null);
            }
            for (Map.Entry entry : map2.entrySet()) {
                ListOffsetData listOffsetData = (ListOffsetData) entry.getValue();
                hashMap.put(entry.getKey(), new OffsetAndTimestamp(listOffsetData.offset, listOffsetData.timestamp.longValue(), listOffsetData.leaderEpoch));
            }
            return hashMap;
        } finally {
            this.metadata.clearTransientTopics();
        }
    }

    private ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> map, Timer timer, boolean z) {
        final ListOffsetResult listOffsetResult = new ListOffsetResult();
        if (map.isEmpty()) {
            return listOffsetResult;
        }
        final HashMap hashMap = new HashMap(map);
        do {
            final RequestFuture<ListOffsetResult> sendListOffsetsRequests = sendListOffsetsRequests(hashMap, z);
            sendListOffsetsRequests.addListener(new RequestFutureListener<ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.OffsetFetcher.1
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ListOffsetResult listOffsetResult2) {
                    synchronized (sendListOffsetsRequests) {
                        listOffsetResult.fetchedOffsets.putAll(listOffsetResult2.fetchedOffsets);
                        hashMap.keySet().retainAll(listOffsetResult2.partitionsToRetry);
                        for (Map.Entry entry : listOffsetResult2.fetchedOffsets.entrySet()) {
                            TopicPartition topicPartition = (TopicPartition) entry.getKey();
                            if (OffsetFetcher.this.subscriptions.isAssigned(topicPartition)) {
                                long j = ((ListOffsetData) entry.getValue()).offset;
                                if (OffsetFetcher.this.isolationLevel == IsolationLevel.READ_COMMITTED) {
                                    OffsetFetcher.this.log.trace("Updating last stable offset for partition {} to {}", topicPartition, Long.valueOf(j));
                                    OffsetFetcher.this.subscriptions.updateLastStableOffset(topicPartition, j);
                                } else {
                                    OffsetFetcher.this.log.trace("Updating high watermark for partition {} to {}", topicPartition, Long.valueOf(j));
                                    OffsetFetcher.this.subscriptions.updateHighWatermark(topicPartition, j);
                                }
                            }
                        }
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    if (!(runtimeException instanceof RetriableException)) {
                        throw sendListOffsetsRequests.exception();
                    }
                }
            });
            if (timer.timeoutMs() != 0) {
                this.client.poll(sendListOffsetsRequests, timer);
                if (!sendListOffsetsRequests.isDone()) {
                    break;
                }
                if (hashMap.isEmpty()) {
                    return listOffsetResult;
                }
                this.client.awaitMetadataUpdate(timer);
            } else {
                return listOffsetResult;
            }
        } while (timer.notExpired());
        throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + BaseUnits.MILLISECONDS);
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Timer timer) {
        return beginningOrEndOffset(collection, -2L, timer);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Timer timer) {
        return beginningOrEndOffset(collection, -1L, timer);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> collection, long j, Timer timer) {
        this.metadata.addTransientTopics(topicsForPartitions(collection));
        try {
            Map<TopicPartition, Long> map = (Map) fetchOffsetsByTimes((Map) collection.stream().distinct().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                return Long.valueOf(j);
            })), timer, false).fetchedOffsets.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return Long.valueOf(((ListOffsetData) entry.getValue()).offset);
            }));
            this.metadata.clearTransientTopics();
            return map;
        } catch (Throwable th) {
            this.metadata.clearTransientTopics();
            throw th;
        }
    }

    void resetPositionIfNeeded(TopicPartition topicPartition, OffsetResetStrategy offsetResetStrategy, ListOffsetData listOffsetData) {
        SubscriptionState.FetchPosition fetchPosition = new SubscriptionState.FetchPosition(listOffsetData.offset, Optional.empty(), this.metadata.currentLeader(topicPartition));
        listOffsetData.leaderEpoch.ifPresent(num -> {
            this.metadata.updateLastSeenEpochIfNewer(topicPartition, num.intValue());
        });
        this.subscriptions.maybeSeekUnvalidated(topicPartition, fetchPosition, offsetResetStrategy);
    }

    private void resetPositionsAsync(Map<TopicPartition, Long> map) {
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : groupListOffsetRequests(map, new HashSet()).entrySet()) {
            Node key = entry.getKey();
            final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> value = entry.getValue();
            this.subscriptions.setNextAllowedRetry(value.keySet(), this.time.milliseconds() + this.requestTimeoutMs);
            sendListOffsetRequest(key, value, false).addListener(new RequestFutureListener<ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.OffsetFetcher.2
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ListOffsetResult listOffsetResult) {
                    if (!listOffsetResult.partitionsToRetry.isEmpty()) {
                        OffsetFetcher.this.subscriptions.requestFailed(listOffsetResult.partitionsToRetry, OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                        OffsetFetcher.this.metadata.requestUpdate();
                    }
                    for (Map.Entry entry2 : listOffsetResult.fetchedOffsets.entrySet()) {
                        TopicPartition topicPartition = (TopicPartition) entry2.getKey();
                        OffsetFetcher.this.resetPositionIfNeeded(topicPartition, OffsetFetcher.this.timestampToOffsetResetStrategy(((ListOffsetsRequestData.ListOffsetsPartition) value.get(topicPartition)).timestamp()), (ListOffsetData) entry2.getValue());
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    OffsetFetcher.this.subscriptions.requestFailed(value.keySet(), OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                    OffsetFetcher.this.metadata.requestUpdate();
                    if ((runtimeException instanceof RetriableException) || OffsetFetcher.this.cachedListOffsetsException.compareAndSet(null, runtimeException)) {
                        return;
                    }
                    OffsetFetcher.this.log.error("Discarding error in ListOffsetResponse because another error is pending", (Throwable) runtimeException);
                }
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean hasUsableOffsetForLeaderEpochVersion(NodeApiVersions nodeApiVersions) {
        ApiVersionsResponseData.ApiVersion apiVersion = nodeApiVersions.apiVersion(ApiKeys.OFFSET_FOR_LEADER_EPOCH);
        if (apiVersion == null) {
            return false;
        }
        return OffsetsForLeaderEpochRequest.supportsTopicPermission(apiVersion.maxVersion());
    }

    private void validatePositionsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> map) {
        Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader = regroupFetchPositionsByLeader(map);
        long milliseconds = this.time.milliseconds() + this.requestTimeoutMs;
        regroupFetchPositionsByLeader.forEach((node, map2) -> {
            if (node.isEmpty()) {
                this.metadata.requestUpdate();
                return;
            }
            NodeApiVersions nodeApiVersions = this.apiVersions.get(node.idString());
            if (nodeApiVersions == null) {
                this.client.tryConnect(node);
                return;
            }
            if (hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                this.subscriptions.setNextAllowedRetry(map2.keySet(), milliseconds);
                this.offsetsForLeaderEpochClient.sendAsyncRequest(node, map2).addListener(new RequestFutureListener<OffsetsForLeaderEpochClient.OffsetForEpochResult>() { // from class: org.apache.kafka.clients.consumer.internals.OffsetFetcher.3
                    @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                    public void onSuccess(OffsetsForLeaderEpochClient.OffsetForEpochResult offsetForEpochResult) {
                        ArrayList arrayList = new ArrayList();
                        if (!offsetForEpochResult.partitionsToRetry().isEmpty()) {
                            OffsetFetcher.this.subscriptions.setNextAllowedRetry(offsetForEpochResult.partitionsToRetry(), OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                            OffsetFetcher.this.metadata.requestUpdate();
                        }
                        Map<TopicPartition, OffsetForLeaderEpochResponseData.EpochEndOffset> endOffsets = offsetForEpochResult.endOffsets();
                        Map map2 = map2;
                        endOffsets.forEach((topicPartition, epochEndOffset) -> {
                            Optional<SubscriptionState.LogTruncation> maybeCompleteValidation = OffsetFetcher.this.subscriptions.maybeCompleteValidation(topicPartition, (SubscriptionState.FetchPosition) map2.get(topicPartition), epochEndOffset);
                            arrayList.getClass();
                            maybeCompleteValidation.ifPresent((v1) -> {
                                r1.add(v1);
                            });
                        });
                        if (arrayList.isEmpty()) {
                            return;
                        }
                        OffsetFetcher.this.maybeSetOffsetForLeaderException(OffsetFetcher.this.buildLogTruncationException(arrayList));
                    }

                    @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                    public void onFailure(RuntimeException runtimeException) {
                        OffsetFetcher.this.subscriptions.requestFailed(map2.keySet(), OffsetFetcher.this.time.milliseconds() + OffsetFetcher.this.retryBackoffMs);
                        OffsetFetcher.this.metadata.requestUpdate();
                        if (runtimeException instanceof RetriableException) {
                            return;
                        }
                        OffsetFetcher.this.maybeSetOffsetForLeaderException(runtimeException);
                    }
                });
                return;
            }
            this.log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not support the required protocol version (introduced in Kafka 2.3)", map2.keySet());
            Iterator it = map2.keySet().iterator();
            while (it.hasNext()) {
                this.subscriptions.completeValidation((TopicPartition) it.next());
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public LogTruncationException buildLogTruncationException(List<SubscriptionState.LogTruncation> list) {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        for (SubscriptionState.LogTruncation logTruncation : list) {
            logTruncation.divergentOffsetOpt.ifPresent(offsetAndMetadata -> {
            });
            hashMap2.put(logTruncation.topicPartition, Long.valueOf(logTruncation.fetchPosition.offset));
        }
        return new LogTruncationException("Detected truncated partitions: " + list, hashMap2, hashMap);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void maybeSetOffsetForLeaderException(RuntimeException runtimeException) {
        if (this.cachedOffsetForLeaderException.compareAndSet(null, runtimeException)) {
            return;
        }
        this.log.error("Discarding error in OffsetsForLeaderEpoch because another error is pending", (Throwable) runtimeException);
    }

    private RequestFuture<ListOffsetResult> sendListOffsetsRequests(Map<TopicPartition, Long> map, boolean z) {
        final HashSet hashSet = new HashSet();
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests = groupListOffsetRequests(map, hashSet);
        if (groupListOffsetRequests.isEmpty()) {
            return RequestFuture.failure(new StaleMetadataException());
        }
        final RequestFuture<ListOffsetResult> requestFuture = new RequestFuture<>();
        final HashMap hashMap = new HashMap();
        final AtomicInteger atomicInteger = new AtomicInteger(groupListOffsetRequests.size());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : groupListOffsetRequests.entrySet()) {
            sendListOffsetRequest(entry.getKey(), entry.getValue(), z).addListener(new RequestFutureListener<ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.OffsetFetcher.4
                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(ListOffsetResult listOffsetResult) {
                    synchronized (requestFuture) {
                        hashMap.putAll(listOffsetResult.fetchedOffsets);
                        hashSet.addAll(listOffsetResult.partitionsToRetry);
                        if (atomicInteger.decrementAndGet() == 0 && !requestFuture.isDone()) {
                            requestFuture.complete(new ListOffsetResult(hashMap, hashSet));
                        }
                    }
                }

                @Override // org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    synchronized (requestFuture) {
                        if (!requestFuture.isDone()) {
                            requestFuture.raise(runtimeException);
                        }
                    }
                }
            });
        }
        return requestFuture;
    }

    private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests(Map<TopicPartition, Long> map, Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            Long value = entry.getValue();
            Metadata.LeaderAndEpoch currentLeader = this.metadata.currentLeader(key);
            if (currentLeader.leader.isPresent()) {
                Node node = currentLeader.leader.get();
                if (this.client.isUnavailable(node)) {
                    this.client.maybeThrowAuthFailure(node);
                    this.log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires", node, key);
                    set.add(key);
                } else {
                    hashMap.put(key, new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(key.partition()).setTimestamp(value.longValue()).setCurrentLeaderEpoch(currentLeader.epoch.orElse(-1).intValue()));
                }
            } else {
                this.log.debug("Leader for partition {} is unknown for fetching offset {}", key, value);
                this.metadata.requestUpdate();
                set.add(key);
            }
        }
        return regroupPartitionMapByNode(hashMap);
    }

    private RequestFuture<ListOffsetResult> sendListOffsetRequest(final Node node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> map, boolean z) {
        ListOffsetsRequest.Builder targetTimes = ListOffsetsRequest.Builder.forConsumer(z, this.isolationLevel, false).setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(map));
        this.log.debug("Sending ListOffsetRequest {} to broker {}", targetTimes, node);
        return this.client.send(node, targetTimes).compose(new RequestFutureAdapter<ClientResponse, ListOffsetResult>() { // from class: org.apache.kafka.clients.consumer.internals.OffsetFetcher.5
            @Override // org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
            public void onSuccess(ClientResponse clientResponse, RequestFuture<ListOffsetResult> requestFuture) {
                ListOffsetsResponse listOffsetsResponse = (ListOffsetsResponse) clientResponse.responseBody();
                OffsetFetcher.this.log.trace("Received ListOffsetResponse {} from broker {}", listOffsetsResponse, node);
                OffsetFetcher.this.handleListOffsetResponse(listOffsetsResponse, requestFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse, RequestFuture<ListOffsetResult> requestFuture) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        for (ListOffsetsResponseData.ListOffsetsTopicResponse listOffsetsTopicResponse : listOffsetsResponse.topics()) {
            for (ListOffsetsResponseData.ListOffsetsPartitionResponse listOffsetsPartitionResponse : listOffsetsTopicResponse.partitions()) {
                TopicPartition topicPartition = new TopicPartition(listOffsetsTopicResponse.name(), listOffsetsPartitionResponse.partitionIndex());
                Errors forCode = Errors.forCode(listOffsetsPartitionResponse.errorCode());
                switch (forCode) {
                    case NONE:
                        if (listOffsetsPartitionResponse.oldStyleOffsets().isEmpty()) {
                            this.log.debug("Handling ListOffsetResponse response for {}. Fetched offset {}, timestamp {}", topicPartition, Long.valueOf(listOffsetsPartitionResponse.offset()), Long.valueOf(listOffsetsPartitionResponse.timestamp()));
                            if (listOffsetsPartitionResponse.offset() == -1) {
                                break;
                            } else {
                                hashMap.put(topicPartition, new ListOffsetData(listOffsetsPartitionResponse.offset(), Long.valueOf(listOffsetsPartitionResponse.timestamp()), listOffsetsPartitionResponse.leaderEpoch() == -1 ? Optional.empty() : Optional.of(Integer.valueOf(listOffsetsPartitionResponse.leaderEpoch()))));
                                break;
                            }
                        } else {
                            if (listOffsetsPartitionResponse.oldStyleOffsets().size() > 1) {
                                requestFuture.raise(new IllegalStateException("Unexpected partitionData response of length " + listOffsetsPartitionResponse.oldStyleOffsets().size()));
                                return;
                            }
                            long longValue = listOffsetsPartitionResponse.oldStyleOffsets().get(0).longValue();
                            this.log.debug("Handling v0 ListOffsetResponse response for {}. Fetched offset {}", topicPartition, Long.valueOf(longValue));
                            if (longValue != -1) {
                                hashMap.put(topicPartition, new ListOffsetData(longValue, null, Optional.empty()));
                                break;
                            } else {
                                break;
                            }
                        }
                    case UNSUPPORTED_FOR_MESSAGE_FORMAT:
                        this.log.debug("Cannot search by timestamp for partition {} because the message format version is before 0.10.0", topicPartition);
                        break;
                    case NOT_LEADER_OR_FOLLOWER:
                    case REPLICA_NOT_AVAILABLE:
                    case KAFKA_STORAGE_ERROR:
                    case OFFSET_NOT_AVAILABLE:
                    case LEADER_NOT_AVAILABLE:
                    case FENCED_LEADER_EPOCH:
                    case UNKNOWN_LEADER_EPOCH:
                        this.log.debug("Attempt to fetch offsets for partition {} failed due to {}, retrying.", topicPartition, forCode);
                        hashSet.add(topicPartition);
                        break;
                    case UNKNOWN_TOPIC_OR_PARTITION:
                        this.log.warn("Received unknown topic or partition error in ListOffset request for partition {}", topicPartition);
                        hashSet.add(topicPartition);
                        break;
                    case TOPIC_AUTHORIZATION_FAILED:
                        hashSet2.add(topicPartition.topic());
                        break;
                    default:
                        this.log.warn("Attempt to fetch offsets for partition {} failed due to unexpected exception: {}, retrying.", topicPartition, forCode.message());
                        hashSet.add(topicPartition);
                        break;
                }
            }
        }
        if (hashSet2.isEmpty()) {
            requestFuture.complete(new ListOffsetResult(hashMap, hashSet));
        } else {
            requestFuture.raise(new TopicAuthorizationException(hashSet2));
        }
    }

    public void validatePositionsOnMetadataChange() {
        int updateVersion = this.metadata.updateVersion();
        if (this.metadataUpdateVersion.getAndSet(updateVersion) != updateVersion) {
            this.subscriptions.assignedPartitions().forEach(topicPartition -> {
                this.subscriptions.maybeValidatePositionForCurrentLeader(this.apiVersions, topicPartition, this.metadata.currentLeader(topicPartition));
            });
        }
    }

    private Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader(Map<TopicPartition, SubscriptionState.FetchPosition> map) {
        return (Map) map.entrySet().stream().filter(entry -> {
            return ((SubscriptionState.FetchPosition) entry.getValue()).currentLeader.leader.isPresent();
        }).collect(Collectors.groupingBy(entry2 -> {
            return ((SubscriptionState.FetchPosition) entry2.getValue()).currentLeader.leader.get();
        }, Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private <T> Map<Node, Map<TopicPartition, T>> regroupPartitionMapByNode(Map<TopicPartition, T> map) {
        return (Map) map.entrySet().stream().collect(Collectors.groupingBy(entry -> {
            return this.metadata.fetch().leaderFor((TopicPartition) entry.getKey());
        }, Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        })));
    }

    private Set<String> topicsForPartitions(Collection<TopicPartition> collection) {
        return (Set) collection.stream().map((v0) -> {
            return v0.topic();
        }).collect(Collectors.toSet());
    }
}
