package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.ApiVersions;
import datahub.shaded.org.apache.kafka.clients.Metadata;
import datahub.shaded.org.apache.kafka.clients.NodeApiVersions;
import datahub.shaded.org.apache.kafka.clients.StaleMetadataException;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochUtils;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.SubscriptionState;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import datahub.shaded.org.apache.kafka.common.ClusterResource;
import datahub.shaded.org.apache.kafka.common.ClusterResourceListener;
import datahub.shaded.org.apache.kafka.common.IsolationLevel;
import datahub.shaded.org.apache.kafka.common.Node;
import datahub.shaded.org.apache.kafka.common.TopicPartition;
import datahub.shaded.org.apache.kafka.common.message.ListOffsetsRequestData;
import datahub.shaded.org.apache.kafka.common.requests.AbstractRequest;
import datahub.shaded.org.apache.kafka.common.requests.ListOffsetsRequest;
import datahub.shaded.org.apache.kafka.common.requests.ListOffsetsResponse;
import datahub.shaded.org.apache.kafka.common.requests.OffsetsForLeaderEpochRequest;
import datahub.shaded.org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.org.apache.kafka.common.utils.Time;
import datahub.shaded.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager.class */
public class OffsetsRequestManager implements RequestManager, ClusterResourceListener {
    private final ConsumerMetadata metadata;
    private final IsolationLevel isolationLevel;
    private final Logger log;
    private final OffsetFetcherUtils offsetFetcherUtils;
    private final SubscriptionState subscriptionState;
    private final Set<ListOffsetsRequestState> requestsToRetry;
    private final List<NetworkClientDelegate.UnsentRequest> requestsToSend;
    private final long requestTimeoutMs;
    private final Time time;
    private final ApiVersions apiVersions;
    private final NetworkClientDelegate networkClientDelegate;
    private final BackgroundEventHandler backgroundEventHandler;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager$ListOffsetsRequestState.class */
    public static class ListOffsetsRequestState {
        private final Map<TopicPartition, Long> timestampsToSearch;
        private final Map<TopicPartition, OffsetFetcherUtils.ListOffsetData> fetchedOffsets;
        private final Map<TopicPartition, Long> remainingToSearch;
        private final CompletableFuture<OffsetFetcherUtils.ListOffsetResult> globalResult;
        final boolean requireTimestamps;
        final OffsetFetcherUtils offsetFetcherUtils;
        final IsolationLevel isolationLevel;

        private ListOffsetsRequestState(Map<TopicPartition, Long> map, boolean z, OffsetFetcherUtils offsetFetcherUtils, IsolationLevel isolationLevel) {
            this.remainingToSearch = new HashMap();
            this.fetchedOffsets = new HashMap();
            this.globalResult = new CompletableFuture<>();
            this.timestampsToSearch = map;
            this.requireTimestamps = z;
            this.offsetFetcherUtils = offsetFetcherUtils;
            this.isolationLevel = isolationLevel;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addPartitionsToRetry(Set<TopicPartition> set) {
            Map<TopicPartition, Long> map = this.remainingToSearch;
            Stream<TopicPartition> stream = set.stream();
            Function function = topicPartition -> {
                return topicPartition;
            };
            Map<TopicPartition, Long> map2 = this.timestampsToSearch;
            map2.getClass();
            map.putAll((Map) stream.collect(Collectors.toMap(function, (v1) -> {
                return r3.get(v1);
            })));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/OffsetsRequestManager$MultiNodeRequest.class */
    public static class MultiNodeRequest {
        final Map<TopicPartition, OffsetFetcherUtils.ListOffsetData> fetchedTimestampOffsets;
        final Set<TopicPartition> partitionsToRetry;
        final AtomicInteger expectedResponses;
        final CompletableFuture<OffsetFetcherUtils.ListOffsetResult> resultFuture;

        private MultiNodeRequest(int i) {
            this.fetchedTimestampOffsets = new HashMap();
            this.partitionsToRetry = new HashSet();
            this.expectedResponses = new AtomicInteger(i);
            this.resultFuture = new CompletableFuture<>();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void onComplete(BiConsumer<? super OffsetFetcherUtils.ListOffsetResult, ? super Throwable> biConsumer) {
            this.resultFuture.whenComplete(biConsumer);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addPartialResult(OffsetFetcherUtils.ListOffsetResult listOffsetResult) {
            try {
                this.fetchedTimestampOffsets.putAll(listOffsetResult.fetchedOffsets);
                this.partitionsToRetry.addAll(listOffsetResult.partitionsToRetry);
                if (this.expectedResponses.decrementAndGet() == 0) {
                    this.resultFuture.complete(new OffsetFetcherUtils.ListOffsetResult(this.fetchedTimestampOffsets, this.partitionsToRetry));
                }
            } catch (RuntimeException e) {
                this.resultFuture.completeExceptionally(e);
            }
        }
    }

    public OffsetsRequestManager(SubscriptionState subscriptionState, ConsumerMetadata consumerMetadata, IsolationLevel isolationLevel, Time time, long j, long j2, ApiVersions apiVersions, NetworkClientDelegate networkClientDelegate, BackgroundEventHandler backgroundEventHandler, LogContext logContext) {
        Objects.requireNonNull(subscriptionState);
        Objects.requireNonNull(consumerMetadata);
        Objects.requireNonNull(isolationLevel);
        Objects.requireNonNull(time);
        Objects.requireNonNull(apiVersions);
        Objects.requireNonNull(networkClientDelegate);
        Objects.requireNonNull(backgroundEventHandler);
        Objects.requireNonNull(logContext);
        this.metadata = consumerMetadata;
        this.isolationLevel = isolationLevel;
        this.log = logContext.logger(getClass());
        this.requestsToRetry = new HashSet();
        this.requestsToSend = new ArrayList();
        this.subscriptionState = subscriptionState;
        this.time = time;
        this.requestTimeoutMs = j2;
        this.apiVersions = apiVersions;
        this.networkClientDelegate = networkClientDelegate;
        this.backgroundEventHandler = backgroundEventHandler;
        this.offsetFetcherUtils = new OffsetFetcherUtils(logContext, consumerMetadata, subscriptionState, time, j, apiVersions);
        this.metadata.addClusterUpdateListener(this);
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public NetworkClientDelegate.PollResult poll(long j) {
        ArrayList arrayList = new ArrayList(this.requestsToSend);
        this.requestsToSend.clear();
        return new NetworkClientDelegate.PollResult(arrayList);
    }

    public CompletableFuture<Map<TopicPartition, OffsetAndTimestampInternal>> fetchOffsets(Map<TopicPartition, Long> map, boolean z) {
        if (map.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        this.metadata.addTransientTopics(OffsetFetcherUtils.topicsForPartitions(map.keySet()));
        ListOffsetsRequestState listOffsetsRequestState = new ListOffsetsRequestState(map, z, this.offsetFetcherUtils, this.isolationLevel);
        listOffsetsRequestState.globalResult.whenComplete((listOffsetResult, th) -> {
            this.metadata.clearTransientTopics();
            if (th != null) {
                this.log.debug("Fetch offsets completed with error for partitions and timestamps {}.", map, th);
            } else {
                this.log.debug("Fetch offsets completed successfully for partitions and timestamps {}. Result {}", map, listOffsetResult);
            }
        });
        prepareFetchOffsetsRequests(map, z, listOffsetsRequestState);
        return listOffsetsRequestState.globalResult.thenApply(listOffsetResult2 -> {
            return OffsetFetcherUtils.buildOffsetsForTimeInternalResult(map, listOffsetResult2.fetchedOffsets);
        });
    }

    public CompletableFuture<Void> resetPositionsIfNeeded() {
        try {
            Map<TopicPartition, Long> offsetResetTimestamp = this.offsetFetcherUtils.getOffsetResetTimestamp();
            return offsetResetTimestamp.isEmpty() ? CompletableFuture.completedFuture(null) : sendListOffsetsRequestsAndResetPositions(offsetResetTimestamp);
        } catch (Exception e) {
            this.backgroundEventHandler.add(new ErrorEvent(e));
            return CompletableFuture.completedFuture(null);
        }
    }

    public CompletableFuture<Void> validatePositionsIfNeeded() {
        Map<TopicPartition, SubscriptionState.FetchPosition> partitionsToValidate = this.offsetFetcherUtils.getPartitionsToValidate();
        return partitionsToValidate.isEmpty() ? CompletableFuture.completedFuture(null) : sendOffsetsForLeaderEpochRequestsAndValidatePositions(partitionsToValidate);
    }

    private void prepareFetchOffsetsRequests(Map<TopicPartition, Long> map, boolean z, ListOffsetsRequestState listOffsetsRequestState) {
        try {
            this.requestsToSend.addAll(buildListOffsetsRequests(map, z, listOffsetsRequestState));
        } catch (StaleMetadataException e) {
            this.requestsToRetry.add(listOffsetsRequestState);
        }
    }

    @Override // datahub.shaded.org.apache.kafka.common.ClusterResourceListener
    public void onUpdate(ClusterResource clusterResource) {
        ArrayList arrayList = new ArrayList(this.requestsToRetry);
        this.requestsToRetry.clear();
        arrayList.forEach(listOffsetsRequestState -> {
            HashMap hashMap = new HashMap(listOffsetsRequestState.remainingToSearch);
            listOffsetsRequestState.remainingToSearch.clear();
            prepareFetchOffsetsRequests(hashMap, listOffsetsRequestState.requireTimestamps, listOffsetsRequestState);
        });
    }

    private List<NetworkClientDelegate.UnsentRequest> buildListOffsetsRequests(Map<TopicPartition, Long> map, boolean z, ListOffsetsRequestState listOffsetsRequestState) {
        this.log.debug("Building ListOffsets request for partitions {}", map);
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests = groupListOffsetRequests(map, Optional.of(listOffsetsRequestState));
        if (groupListOffsetRequests.isEmpty()) {
            throw new StaleMetadataException();
        }
        ArrayList arrayList = new ArrayList();
        MultiNodeRequest multiNodeRequest = new MultiNodeRequest(groupListOffsetRequests.size());
        multiNodeRequest.onComplete((listOffsetResult, th) -> {
            if (th != null) {
                this.log.debug("ListOffsets request failed with error", th);
                listOffsetsRequestState.globalResult.completeExceptionally(th);
                return;
            }
            listOffsetsRequestState.fetchedOffsets.putAll(listOffsetResult.fetchedOffsets);
            listOffsetsRequestState.addPartitionsToRetry(listOffsetResult.partitionsToRetry);
            this.offsetFetcherUtils.updateSubscriptionState(listOffsetResult.fetchedOffsets, this.isolationLevel);
            if (!listOffsetsRequestState.remainingToSearch.isEmpty()) {
                this.requestsToRetry.add(listOffsetsRequestState);
            } else {
                listOffsetsRequestState.globalResult.complete(new OffsetFetcherUtils.ListOffsetResult(listOffsetsRequestState.fetchedOffsets, listOffsetsRequestState.remainingToSearch.keySet()));
            }
        });
        for (Map.Entry<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> entry : groupListOffsetRequests.entrySet()) {
            buildListOffsetRequestToNode(entry.getKey(), entry.getValue(), z, arrayList).whenComplete((listOffsetResult2, th2) -> {
                if (th2 != null) {
                    multiNodeRequest.resultFuture.completeExceptionally(th2);
                } else {
                    multiNodeRequest.addPartialResult(listOffsetResult2);
                }
            });
        }
        return arrayList;
    }

    private CompletableFuture<OffsetFetcherUtils.ListOffsetResult> buildListOffsetRequestToNode(Node node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition> map, boolean z, List<NetworkClientDelegate.UnsentRequest> list) {
        ListOffsetsRequest.Builder targetTimes = ListOffsetsRequest.Builder.forConsumer(z, this.isolationLevel).setTargetTimes(ListOffsetsRequest.toListOffsetsTopics(map));
        this.log.debug("Creating ListOffset request {} for broker {} to reset positions", targetTimes, node);
        NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(targetTimes, Optional.ofNullable(node));
        list.add(unsentRequest);
        CompletableFuture<OffsetFetcherUtils.ListOffsetResult> completableFuture = new CompletableFuture<>();
        unsentRequest.whenComplete((clientResponse, th) -> {
            if (th != null) {
                this.log.debug("Sending ListOffset request {} to broker {} failed", targetTimes, node, th);
                completableFuture.completeExceptionally(th);
                return;
            }
            ListOffsetsResponse listOffsetsResponse = (ListOffsetsResponse) clientResponse.responseBody();
            this.log.trace("Received ListOffsetResponse {} from broker {}", listOffsetsResponse, node);
            try {
                completableFuture.complete(this.offsetFetcherUtils.handleListOffsetResponse(listOffsetsResponse));
            } catch (RuntimeException e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private CompletableFuture<Void> sendListOffsetsRequestsAndResetPositions(Map<TopicPartition, Long> map) {
        Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests = groupListOffsetRequests(map, Optional.empty());
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ArrayList arrayList = new ArrayList();
        groupListOffsetRequests.forEach((node, map2) -> {
            this.subscriptionState.setNextAllowedRetry(map2.keySet(), this.time.milliseconds() + this.requestTimeoutMs);
            buildListOffsetRequestToNode(node, map2, false, arrayList).whenComplete((listOffsetResult, th) -> {
                if (th == null) {
                    this.offsetFetcherUtils.onSuccessfulResponseForResettingPositions(map2, listOffsetResult);
                } else {
                    this.offsetFetcherUtils.onFailedResponseForResettingPositions(map2, th instanceof RuntimeException ? (RuntimeException) th : new RuntimeException("Unexpected failure in ListOffsets request for resetting positions", th));
                }
                if (atomicInteger.decrementAndGet() == 0) {
                    completableFuture.complete(null);
                }
            });
        });
        if (arrayList.isEmpty()) {
            completableFuture.complete(null);
        } else {
            atomicInteger.set(arrayList.size());
            this.requestsToSend.addAll(arrayList);
        }
        return completableFuture;
    }

    private CompletableFuture<Void> sendOffsetsForLeaderEpochRequestsAndValidatePositions(Map<TopicPartition, SubscriptionState.FetchPosition> map) {
        Map<Node, Map<TopicPartition, SubscriptionState.FetchPosition>> regroupFetchPositionsByLeader = OffsetFetcherUtils.regroupFetchPositionsByLeader(map);
        long milliseconds = this.time.milliseconds() + this.requestTimeoutMs;
        AtomicInteger atomicInteger = new AtomicInteger(0);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        ArrayList arrayList = new ArrayList();
        regroupFetchPositionsByLeader.forEach((node, map2) -> {
            if (node.isEmpty()) {
                this.metadata.requestUpdate(true);
                return;
            }
            NodeApiVersions nodeApiVersions = this.apiVersions.get(node.idString());
            if (nodeApiVersions == null) {
                this.networkClientDelegate.tryConnect(node);
                return;
            }
            if (OffsetFetcherUtils.hasUsableOffsetForLeaderEpochVersion(nodeApiVersions)) {
                this.subscriptionState.setNextAllowedRetry(map2.keySet(), milliseconds);
                buildOffsetsForLeaderEpochRequestToNode(node, map2, arrayList).whenComplete((offsetForEpochResult, th) -> {
                    if (th == null) {
                        this.offsetFetcherUtils.onSuccessfulResponseForValidatingPositions(map2, offsetForEpochResult);
                    } else {
                        this.offsetFetcherUtils.onFailedResponseForValidatingPositions(map2, th instanceof RuntimeException ? (RuntimeException) th : new RuntimeException("Unexpected failure in OffsetsForLeaderEpoch request for validating positions", th));
                    }
                    if (atomicInteger.decrementAndGet() == 0) {
                        completableFuture.complete(null);
                    }
                });
                return;
            }
            this.log.debug("Skipping validation of fetch offsets for partitions {} since the broker does not support the required protocol version (introduced in Kafka 2.3)", map2.keySet());
            Iterator it = map2.keySet().iterator();
            while (it.hasNext()) {
                this.subscriptionState.completeValidation((TopicPartition) it.next());
            }
        });
        if (arrayList.isEmpty()) {
            completableFuture.complete(null);
        } else {
            atomicInteger.set(arrayList.size());
            this.requestsToSend.addAll(arrayList);
        }
        return completableFuture;
    }

    private CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> buildOffsetsForLeaderEpochRequestToNode(Node node, Map<TopicPartition, SubscriptionState.FetchPosition> map, List<NetworkClientDelegate.UnsentRequest> list) {
        AbstractRequest.Builder<OffsetsForLeaderEpochRequest> prepareRequest = OffsetsForLeaderEpochUtils.prepareRequest(map);
        this.log.debug("Creating OffsetsForLeaderEpoch request request {} to broker {}", prepareRequest, node);
        NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(prepareRequest, Optional.ofNullable(node));
        list.add(unsentRequest);
        CompletableFuture<OffsetsForLeaderEpochUtils.OffsetForEpochResult> completableFuture = new CompletableFuture<>();
        unsentRequest.whenComplete((clientResponse, th) -> {
            if (th != null) {
                this.log.debug("Sending OffsetsForLeaderEpoch request {} to broker {} failed", prepareRequest, node, th);
                completableFuture.completeExceptionally(th);
                return;
            }
            OffsetsForLeaderEpochResponse offsetsForLeaderEpochResponse = (OffsetsForLeaderEpochResponse) clientResponse.responseBody();
            this.log.trace("Received OffsetsForLeaderEpoch response {} from broker {}", offsetsForLeaderEpochResponse, node);
            try {
                completableFuture.complete(OffsetsForLeaderEpochUtils.handleResponse(map, offsetsForLeaderEpochResponse));
            } catch (RuntimeException e) {
                completableFuture.completeExceptionally(e);
            }
        });
        return completableFuture;
    }

    private Map<Node, Map<TopicPartition, ListOffsetsRequestData.ListOffsetsPartition>> groupListOffsetRequests(Map<TopicPartition, Long> map, Optional<ListOffsetsRequestState> optional) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            Long value = entry.getValue();
            Metadata.LeaderAndEpoch currentLeader = this.metadata.currentLeader(key);
            if (currentLeader.leader.isPresent()) {
                hashMap.put(key, new ListOffsetsRequestData.ListOffsetsPartition().setPartitionIndex(key.partition()).setTimestamp(value.longValue()).setCurrentLeaderEpoch(currentLeader.epoch.orElse(-1).intValue()));
            } else {
                this.log.debug("Leader for partition {} is unknown for fetching offset {}", key, value);
                this.metadata.requestUpdate(true);
                optional.ifPresent(listOffsetsRequestState -> {
                });
            }
        }
        return this.offsetFetcherUtils.regroupPartitionMapByNode(hashMap);
    }

    int requestsToRetry() {
        return this.requestsToRetry.size();
    }

    int requestsToSend() {
        return this.requestsToSend.size();
    }
}
