package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import com.ibm.icu.text.DateFormat;
import datahub.shaded.org.apache.kafka.clients.ApiVersions;
import datahub.shaded.org.apache.kafka.clients.ClientResponse;
import datahub.shaded.org.apache.kafka.clients.Metadata;
import datahub.shaded.org.apache.kafka.clients.NodeApiVersions;
import datahub.shaded.org.apache.kafka.clients.StaleMetadataException;
import datahub.shaded.org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState;
import datahub.shaded.org.apache.kafka.common.IsolationLevel;
import datahub.shaded.org.apache.kafka.common.Node;
import datahub.shaded.org.apache.kafka.common.TopicPartition;
import datahub.shaded.org.apache.kafka.common.errors.RetriableException;
import datahub.shaded.org.apache.kafka.common.errors.TimeoutException;
import datahub.shaded.org.apache.kafka.common.message.ListOffsetsRequestData;
import datahub.shaded.org.apache.kafka.common.requests.ListOffsetsRequest;
import datahub.shaded.org.apache.kafka.common.requests.ListOffsetsResponse;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.org.apache.kafka.common.utils.Time;
import datahub.shaded.org.apache.kafka.common.utils.Timer;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/OffsetFetcher.class */
public class OffsetFetcher {
    private final Logger log;
    private final ConsumerMetadata metadata;
    private final SubscriptionState subscriptions;
    private final ConsumerNetworkClient client;
    private final Time time;
    private final long requestTimeoutMs;
    private final IsolationLevel isolationLevel;
    private final OffsetsForLeaderEpochClient offsetsForLeaderEpochClient;
    private final ApiVersions apiVersions;
    private final OffsetFetcherUtils offsetFetcherUtils;

    public OffsetFetcher(LogContext logContext, ConsumerNetworkClient consumerNetworkClient, ConsumerMetadata consumerMetadata, SubscriptionState subscriptionState, Time time, long j, long j2, IsolationLevel isolationLevel, ApiVersions apiVersions) {
        this.log = logContext.logger(getClass());
        this.time = time;
        this.client = consumerNetworkClient;
        this.metadata = consumerMetadata;
        this.subscriptions = subscriptionState;
        this.requestTimeoutMs = j2;
        this.isolationLevel = isolationLevel;
        this.apiVersions = apiVersions;
        this.offsetsForLeaderEpochClient = new OffsetsForLeaderEpochClient(consumerNetworkClient, logContext);
        this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, consumerMetadata, subscriptionState, time, j, apiVersions);
    }

    public void resetPositionsIfNeeded() {
        Map<TopicPartition, Long> offsetResetTimestamp = this.offsetFetcherUtils.getOffsetResetTimestamp();
        if (offsetResetTimestamp.isEmpty()) {
            return;
        }
        resetPositionsAsync(offsetResetTimestamp);
    }

    public void validatePositionsIfNeeded() {
        validatePositionsAsync(this.offsetFetcherUtils.getPartitionsToValidate());
    }

    public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> map, Timer timer) {
        this.metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(map.keySet()));
        try {
            Map<TopicPartition, OffsetAndTimestamp> buildOffsetsForTimesResult = OffsetFetcherUtils.buildOffsetsForTimesResult(map, fetchOffsetsByTimes(map, timer, true).fetchedOffsets);
            this.metadata.clearTransientTopics();
            return buildOffsetsForTimesResult;
        } catch (Throwable th) {
            this.metadata.clearTransientTopics();
            throw th;
        }
    }

    private OffsetFetcherUtils.ListOffsetResult fetchOffsetsByTimes(Map<TopicPartition, Long> map, Timer timer, boolean z) {
        final OffsetFetcherUtils.ListOffsetResult listOffsetResult = new OffsetFetcherUtils.ListOffsetResult();
        if (map.isEmpty()) {
            return listOffsetResult;
        }
        final HashMap hashMap = new HashMap(map);
        do {
            final RequestFuture<OffsetFetcherUtils.ListOffsetResult> sendListOffsetsRequests = sendListOffsetsRequests(hashMap, z);
            sendListOffsetsRequests.addListener(new RequestFutureListener<OffsetFetcherUtils.ListOffsetResult>() { // from class: datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetFetcher.1
                @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(OffsetFetcherUtils.ListOffsetResult listOffsetResult2) {
                    synchronized (sendListOffsetsRequests) {
                        listOffsetResult.fetchedOffsets.putAll(listOffsetResult2.fetchedOffsets);
                        hashMap.keySet().retainAll(listOffsetResult2.partitionsToRetry);
                        OffsetFetcher.this.offsetFetcherUtils.updateSubscriptionState(listOffsetResult2.fetchedOffsets, OffsetFetcher.this.isolationLevel);
                    }
                }

                @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    if (!(runtimeException instanceof RetriableException)) {
                        throw sendListOffsetsRequests.exception();
                    }
                }
            });
            if (timer.timeoutMs() != 0) {
                this.client.poll(sendListOffsetsRequests, timer);
                if (!sendListOffsetsRequests.isDone()) {
                    break;
                }
                if (hashMap.isEmpty()) {
                    return listOffsetResult;
                }
                this.client.awaitMetadataUpdate(timer);
            } else {
                return listOffsetResult;
            }
        } while (timer.notExpired());
        throw new TimeoutException("Failed to get offsets by times in " + timer.elapsedMs() + DateFormat.MINUTE_SECOND);
    }

    public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> collection, Timer timer) {
        return beginningOrEndOffset(collection, -2L, timer);
    }

    public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> collection, Timer timer) {
        return beginningOrEndOffset(collection, -1L, timer);
    }

    private Map<TopicPartition, Long> beginningOrEndOffset(Collection<TopicPartition> collection, long j, Timer timer) {
        this.metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(collection));
        try {
            Map<TopicPartition, Long> map = (Map) fetchOffsetsByTimes((Map) collection.stream().distinct().collect(Collectors.toMap(Function.identity(), topicPartition -> {
                return Long.valueOf(j);
            })), timer, false).fetchedOffsets.entrySet().stream().collect(Collectors.toMap((v0) -> {
                return v0.getKey();
            }, entry -> {
                return Long.valueOf(((OffsetFetcherUtils.ListOffsetData) entry.getValue()).offset);
            }));
            this.metadata.clearTransientTopics();
            return map;
        } catch (Throwable th) {
            this.metadata.clearTransientTopics();
            throw th;
        }
    }

    private void resetPositionsAsync(Map<TopicPartition, Long> map) {
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : groupListOffsetRequests(map, new HashSet()).entrySet()) {
            Node key = entry.getKey();
            final Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> value = entry.getValue();
            this.subscriptions.setNextAllowedRetry(value.keySet(), this.time.milliseconds() + this.requestTimeoutMs);
            sendListOffsetRequest(key, value, false).addListener(new RequestFutureListener<OffsetFetcherUtils.ListOffsetResult>() { // from class: datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetFetcher.2
                @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(OffsetFetcherUtils.ListOffsetResult listOffsetResult) {
                    OffsetFetcher.this.offsetFetcherUtils.onSuccessfulResponseForResettingPositions(value, listOffsetResult);
                }

                @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    OffsetFetcher.this.offsetFetcherUtils.onFailedResponseForResettingPositions(value, runtimeException);
                }
            });
        }
    }

    private void validatePositionsAsync(Map<TopicPartition, SubscriptionState.FetchPosition> map) {
        Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader = OffsetFetcherUtils.regroupFetchPositionsByLeader(map);
        long milliseconds = this.time.milliseconds() + this.requestTimeoutMs;
        regroupFetchPositionsByLeader.forEach((node, map2) -> {
            if (node.isEmpty()) {
                this.metadata.requestUpdate(true);
                return;
            }
            NodeApiVersions nodeApiVersions = this.apiVersions.get(node.idString());
            if (nodeApiVersions == null) {
                this.client.tryConnect(node);
                return;
            }
            if (OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                this.subscriptions.setNextAllowedRetry(map2.keySet(), milliseconds);
                this.offsetsForLeaderEpochClient.sendAsyncRequest(node, map2).addListener(new RequestFutureListener<OffsetsForLeaderEpochUtils.OffsetForEpochResult>() { // from class: datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetFetcher.3
                    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                    public void onSuccess(OffsetsForLeaderEpochUtils.OffsetForEpochResult offsetForEpochResult) {
                        OffsetFetcher.this.offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(map2, offsetForEpochResult);
                    }

                    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                    public void onFailure(RuntimeException runtimeException) {
                        OffsetFetcher.this.offsetFetcherUtils.onFailedResponseForValidatingPositions(map2, runtimeException);
                    }
                });
                return;
            }
            this.log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not support the required protocol version (introduced in Kafka 2.3)", map2.keySet());
            Iterator it = map2.keySet().iterator();
            while (it.hasNext()) {
                this.subscriptions.completeValidation((TopicPartition) it.next());
            }
        });
    }

    private RequestFuture<OffsetFetcherUtils.ListOffsetResult> sendListOffsetsRequests(Map<TopicPartition, Long> map, boolean z) {
        final HashSet hashSet = new HashSet();
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests = groupListOffsetRequests(map, hashSet);
        if (groupListOffsetRequests.isEmpty()) {
            return RequestFuture.failure(new StaleMetadataException());
        }
        final RequestFuture<OffsetFetcherUtils.ListOffsetResult> requestFuture = new RequestFuture<>();
        final HashMap hashMap = new HashMap();
        final AtomicInteger atomicInteger = new AtomicInteger(groupListOffsetRequests.size());
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : groupListOffsetRequests.entrySet()) {
            sendListOffsetRequest(entry.getKey(), entry.getValue(), z).addListener(new RequestFutureListener<OffsetFetcherUtils.ListOffsetResult>() { // from class: datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetFetcher.4
                @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onSuccess(OffsetFetcherUtils.ListOffsetResult listOffsetResult) {
                    synchronized (requestFuture) {
                        hashMap.putAll(listOffsetResult.fetchedOffsets);
                        hashSet.addAll(listOffsetResult.partitionsToRetry);
                        if (atomicInteger.decrementAndGet() == 0 && !requestFuture.isDone()) {
                            requestFuture.complete(new OffsetFetcherUtils.ListOffsetResult(hashMap, hashSet));
                        }
                    }
                }

                @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureListener
                public void onFailure(RuntimeException runtimeException) {
                    synchronized (requestFuture) {
                        if (!requestFuture.isDone()) {
                            requestFuture.raise(runtimeException);
                        }
                    }
                }
            });
        }
        return requestFuture;
    }

    private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests(Map<TopicPartition, Long> map, Set<TopicPartition> set) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            Long value = entry.getValue();
            Metadata.LeaderAndEpoch currentLeader = this.metadata.currentLeader(key);
            if (currentLeader.leader.isPresent()) {
                Node node = currentLeader.leader.get();
                if (this.client.isUnavailable(node)) {
                    this.client.maybeThrowAuthFailure(node);
                    this.log.debug("Leader {} for partition {} is unavailable for fetching offset until reconnect backoff expires", node, key);
                    set.add(key);
                } else {
                    hashMap.put(key, new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(key.partition()).setTimestamp(value.longValue()).setCurrentLeaderEpoch(currentLeader.epoch.orElse(-1).intValue()));
                }
            } else {
                this.log.debug("Leader for partition {} is unknown for fetching offset {}", key, value);
                this.metadata.requestUpdate(true);
                set.add(key);
            }
        }
        return this.offsetFetcherUtils.regroupPartitionMapByNode(hashMap);
    }

    private RequestFuture<OffsetFetcherUtils.ListOffsetResult> sendListOffsetRequest(final Node node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> map, boolean z) {
        ListOffsetsRequest.Builder targetTimes = ListOffsetsRequest.Builder.forConsumer(z, this.isolationLevel).setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(map));
        this.log.debug("Sending ListOffsetRequest {} to broker {}", targetTimes, node);
        return this.client.send(node, targetTimes).compose(new RequestFutureAdapter<ClientResponse, OffsetFetcherUtils.ListOffsetResult>() { // from class: datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetFetcher.5
            @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestFutureAdapter
            public void onSuccess(ClientResponse clientResponse, RequestFuture<OffsetFetcherUtils.ListOffsetResult> requestFuture) {
                ListOffsetsResponse listOffsetsResponse = (ListOffsetsResponse) clientResponse.responseBody();
                OffsetFetcher.this.log.trace("Received ListOffsetResponse {} from broker {}", listOffsetsResponse, node);
                OffsetFetcher.this.handleListOffsetResponse(listOffsetsResponse, requestFuture);
            }
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleListOffsetResponse(ListOffsetsResponse listOffsetsResponse, RequestFuture<OffsetFetcherUtils.ListOffsetResult> requestFuture) {
        try {
            requestFuture.complete(this.offsetFetcherUtils.handleListOffsetResponse(listOffsetsResponse));
        } catch (RuntimeException e) {
            requestFuture.raise(e);
        }
    }

    public void validatePositionsOnMetadataChange() {
        this.offsetFetcherUtils.validatePositionsOnMetadataChange();
    }
}
