package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.ClientResponse;
import datahub.shaded.org.apache.kafka.clients.consumer.CommitFailedException;
import datahub.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import datahub.shaded.org.apache.kafka.clients.consumer.OffsetAndMetadata;
import datahub.shaded.org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.metrics.OffsetCommitMetricsManager;
import datahub.shaded.org.apache.kafka.common.KafkaException;
import datahub.shaded.org.apache.kafka.common.TopicPartition;
import datahub.shaded.org.apache.kafka.common.errors.DisconnectException;
import datahub.shaded.org.apache.kafka.common.errors.GroupAuthorizationException;
import datahub.shaded.org.apache.kafka.common.errors.RetriableException;
import datahub.shaded.org.apache.kafka.common.errors.StaleMemberEpochException;
import datahub.shaded.org.apache.kafka.common.errors.TimeoutException;
import datahub.shaded.org.apache.kafka.common.errors.TopicAuthorizationException;
import datahub.shaded.org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import datahub.shaded.org.apache.kafka.common.errors.UnstableOffsetCommitException;
import datahub.shaded.org.apache.kafka.common.message.OffsetCommitRequestData;
import datahub.shaded.org.apache.kafka.common.message.OffsetCommitResponseData;
import datahub.shaded.org.apache.kafka.common.metrics.Metrics;
import datahub.shaded.org.apache.kafka.common.network.ClientInformation;
import datahub.shaded.org.apache.kafka.common.protocol.Errors;
import datahub.shaded.org.apache.kafka.common.requests.AbstractRequest;
import datahub.shaded.org.apache.kafka.common.requests.OffsetCommitRequest;
import datahub.shaded.org.apache.kafka.common.requests.OffsetCommitResponse;
import datahub.shaded.org.apache.kafka.common.requests.OffsetFetchRequest;
import datahub.shaded.org.apache.kafka.common.requests.OffsetFetchResponse;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.org.apache.kafka.common.utils.Time;
import datahub.shaded.org.apache.kafka.common.utils.Timer;
import datahub.shaded.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalDouble;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CommitRequestManager.class */
public class CommitRequestManager implements RequestManager, MemberStateListener {
    private final Time time;
    private final SubscriptionState subscriptions;
    private final LogContext logContext;
    private final Logger log;
    private final Optional<AutoCommitState> autoCommitState;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
    private final OffsetCommitMetricsManager metricsManager;
    private final long retryBackoffMs;
    private final String groupId;
    private final Optional<String> groupInstanceId;
    private final long retryBackoffMaxMs;
    private final OptionalDouble jitter;
    private final boolean throwOnFetchStableOffsetUnsupported;
    final PendingRequests pendingRequests;
    private boolean closing;
    private Optional<Integer> lastEpochSentOnCommit;
    private final MemberInfo memberInfo;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CommitRequestManager$AutoCommitState.class */
    public static class AutoCommitState {
        private final Timer timer;
        private final long autoCommitInterval;
        private boolean hasInflightCommit = false;
        private final Logger log;

        public AutoCommitState(Time time, long j, LogContext logContext) {
            this.autoCommitInterval = j;
            this.timer = time.timer(j);
            this.log = logContext.logger(getClass());
        }

        public boolean shouldAutoCommit() {
            if (!this.timer.isExpired()) {
                return false;
            }
            if (!this.hasInflightCommit) {
                return true;
            }
            this.log.trace("Skipping auto-commit on the interval because a previous one is still in-flight.");
            return false;
        }

        public void resetTimer() {
            this.timer.reset(this.autoCommitInterval);
        }

        public void resetTimer(long j) {
            this.timer.reset(j);
        }

        public long remainingMs(long j) {
            this.timer.update(j);
            return this.timer.remainingMs();
        }

        public void updateTimer(long j) {
            this.timer.update(j);
        }

        public void setInflightCommitStatus(boolean z) {
            this.hasInflightCommit = z;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CommitRequestManager$MemberInfo.class */
    public static class MemberInfo {
        Optional<String> memberId = Optional.empty();
        Optional<Integer> memberEpoch = Optional.empty();

        MemberInfo() {
        }

        public String toString() {
            return "memberId=" + this.memberId.orElse("undefined") + ", memberEpoch=" + (this.memberEpoch.isPresent() ? this.memberEpoch.get() : "undefined");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CommitRequestManager$OffsetCommitRequestState.class */
    public class OffsetCommitRequestState extends RetriableRequestState {
        private Map<TopicPartition, OffsetAndMetadata> offsets;
        private final String groupId;
        private final Optional<String> groupInstanceId;
        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

        OffsetCommitRequestState(Map<TopicPartition, OffsetAndMetadata> map, String str, Optional<String> optional, long j, long j2, long j3, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), j2, j3, memberInfo, deadlineTimer(CommitRequestManager.this.time, j));
            this.offsets = map;
            this.groupId = str;
            this.groupInstanceId = optional;
            this.future = new CompletableFuture<>();
        }

        OffsetCommitRequestState(Map<TopicPartition, OffsetAndMetadata> map, String str, Optional<String> optional, long j, long j2, long j3, double d, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), j2, 2, j3, d, memberInfo, deadlineTimer(CommitRequestManager.this.time, j));
            this.offsets = map;
            this.groupId = str;
            this.groupInstanceId = optional;
            this.future = new CompletableFuture<>();
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
            HashMap hashMap = new HashMap();
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : this.offsets.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetAndMetadata value = entry.getValue();
                OffsetCommitRequestData.OffsetCommitRequestTopic offsetCommitRequestTopic = (OffsetCommitRequestData.OffsetCommitRequestTopic) hashMap.getOrDefault(key.topic(), new OffsetCommitRequestData.OffsetCommitRequestTopic().setName(key.topic()));
                offsetCommitRequestTopic.partitions().add(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(key.partition()).setCommittedOffset(value.offset()).setCommittedLeaderEpoch(value.leaderEpoch().orElse(-1).intValue()).setCommittedMetadata(value.metadata()));
                hashMap.put(key.topic(), offsetCommitRequestTopic);
            }
            OffsetCommitRequestData topics = new OffsetCommitRequestData().setGroupId(this.groupId).setGroupInstanceId(this.groupInstanceId.orElse(null)).setTopics(new ArrayList(hashMap.values()));
            if (this.memberInfo.memberId.isPresent()) {
                topics = topics.setMemberId(this.memberInfo.memberId.get());
            }
            if (this.memberInfo.memberEpoch.isPresent()) {
                topics = topics.setGenerationIdOrMemberEpoch(this.memberInfo.memberEpoch.get().intValue());
                CommitRequestManager.this.lastEpochSentOnCommit = this.memberInfo.memberEpoch;
            } else {
                CommitRequestManager.this.lastEpochSentOnCommit = Optional.empty();
            }
            return buildRequestWithResponseHandling(new OffsetCommitRequest.Builder(topics));
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        public void onResponse(ClientResponse clientResponse) {
            CommitRequestManager.this.metricsManager.recordRequestLatency(clientResponse.requestLatencyMs());
            long receivedTimeMs = clientResponse.receivedTimeMs();
            OffsetCommitResponse offsetCommitResponse = (OffsetCommitResponse) clientResponse.responseBody();
            HashSet hashSet = new HashSet();
            for (OffsetCommitResponseData.OffsetCommitResponseTopic offsetCommitResponseTopic : offsetCommitResponse.data().topics()) {
                for (OffsetCommitResponseData.OffsetCommitResponsePartition offsetCommitResponsePartition : offsetCommitResponseTopic.partitions()) {
                    TopicPartition topicPartition = new TopicPartition(offsetCommitResponseTopic.name(), offsetCommitResponsePartition.partitionIndex());
                    Errors forCode = Errors.forCode(offsetCommitResponsePartition.errorCode());
                    if (forCode == Errors.NONE) {
                        CommitRequestManager.this.log.debug("OffsetCommit completed successfully for offset {} partition {}", Long.valueOf(this.offsets.get(topicPartition).offset()), topicPartition);
                    } else {
                        onFailedAttempt(receivedTimeMs);
                        if (forCode == Errors.GROUP_AUTHORIZATION_FAILED) {
                            this.future.completeExceptionally(GroupAuthorizationException.forGroupId(this.groupId));
                            return;
                        }
                        if (forCode == Errors.COORDINATOR_NOT_AVAILABLE || forCode == Errors.NOT_COORDINATOR || forCode == Errors.REQUEST_TIMED_OUT) {
                            CommitRequestManager.this.coordinatorRequestManager.markCoordinatorUnknown(forCode.message(), receivedTimeMs);
                            this.future.completeExceptionally(forCode.exception());
                            return;
                        }
                        if (forCode == Errors.FENCED_INSTANCE_ID) {
                            String str = "OffsetCommit failed due to group instance id fenced: " + this.groupInstanceId;
                            CommitRequestManager.this.log.error(str);
                            this.future.completeExceptionally(new CommitFailedException(str));
                            return;
                        }
                        if (forCode == Errors.OFFSET_METADATA_TOO_LARGE || forCode == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                            this.future.completeExceptionally(forCode.exception());
                            return;
                        }
                        if (forCode == Errors.COORDINATOR_LOAD_IN_PROGRESS || forCode == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                            this.future.completeExceptionally(forCode.exception());
                            return;
                        }
                        if (forCode == Errors.UNKNOWN_MEMBER_ID) {
                            CommitRequestManager.this.log.error("OffsetCommit failed with {}", forCode);
                            this.future.completeExceptionally(new CommitFailedException("OffsetCommit failed with unknown member ID. " + forCode.message()));
                            return;
                        } else if (forCode == Errors.STALE_MEMBER_EPOCH) {
                            CommitRequestManager.this.log.error("OffsetCommit failed for member {} with stale member epoch error. Last epoch sent: {}", this.memberInfo.memberId.orElse("undefined"), CommitRequestManager.this.lastEpochSentOnCommit.isPresent() ? CommitRequestManager.this.lastEpochSentOnCommit.get() : "undefined");
                            this.future.completeExceptionally(forCode.exception());
                            return;
                        } else {
                            if (forCode != Errors.TOPIC_AUTHORIZATION_FAILED) {
                                this.future.completeExceptionally(new KafkaException("Unexpected error in commit: " + forCode.message()));
                                return;
                            }
                            hashSet.add(topicPartition.topic());
                        }
                    }
                }
            }
            if (hashSet.isEmpty()) {
                this.future.complete(null);
            } else {
                CommitRequestManager.this.log.error("OffsetCommit failed due to not authorized to commit to topics {}", hashSet);
                this.future.completeExceptionally(new TopicAuthorizationException(hashSet));
            }
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        String requestDescription() {
            return "OffsetCommit request for offsets " + this.offsets;
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        CompletableFuture<?> future() {
            return this.future;
        }

        void resetFuture() {
            this.future = new CompletableFuture<>();
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        void removeRequest() {
            if (CommitRequestManager.this.unsentOffsetCommitRequests().remove(this)) {
                return;
            }
            CommitRequestManager.this.log.warn("OffsetCommit request to remove not found in the outbound buffer: {}", this);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CommitRequestManager$OffsetFetchRequestState.class */
    public class OffsetFetchRequestState extends RetriableRequestState {
        public final Set<TopicPartition> requestedPartitions;
        private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> future;

        public OffsetFetchRequestState(Set<TopicPartition> set, long j, long j2, long j3, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), j, j2, memberInfo, deadlineTimer(CommitRequestManager.this.time, j3));
            this.requestedPartitions = set;
            this.future = new CompletableFuture<>();
        }

        public OffsetFetchRequestState(Set<TopicPartition> set, long j, long j2, long j3, double d, MemberInfo memberInfo) {
            super(CommitRequestManager.this.logContext, CommitRequestManager.class.getSimpleName(), j, 2, j2, d, memberInfo, deadlineTimer(CommitRequestManager.this.time, j3));
            this.requestedPartitions = set;
            this.future = new CompletableFuture<>();
        }

        public boolean sameRequest(OffsetFetchRequestState offsetFetchRequestState) {
            return this.requestedPartitions.equals(offsetFetchRequestState.requestedPartitions);
        }

        public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
            return buildRequestWithResponseHandling((this.memberInfo.memberId.isPresent() && this.memberInfo.memberEpoch.isPresent()) ? new OffsetFetchRequest.Builder(CommitRequestManager.this.groupId, this.memberInfo.memberId.get(), this.memberInfo.memberEpoch.get().intValue(), true, new ArrayList(this.requestedPartitions), CommitRequestManager.this.throwOnFetchStableOffsetUnsupported) : new OffsetFetchRequest.Builder(CommitRequestManager.this.groupId, true, new ArrayList(this.requestedPartitions), CommitRequestManager.this.throwOnFetchStableOffsetUnsupported));
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        void onResponse(ClientResponse clientResponse) {
            long receivedTimeMs = clientResponse.receivedTimeMs();
            OffsetFetchResponse offsetFetchResponse = (OffsetFetchResponse) clientResponse.responseBody();
            Errors groupLevelError = offsetFetchResponse.groupLevelError(CommitRequestManager.this.groupId);
            if (groupLevelError != Errors.NONE) {
                onFailure(receivedTimeMs, groupLevelError);
            } else {
                onSuccess(receivedTimeMs, offsetFetchResponse);
            }
        }

        private void onFailure(long j, Errors errors) {
            CommitRequestManager.this.log.debug("Offset fetch failed: {}", errors.message());
            onFailedAttempt(j);
            if (errors == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
                this.future.completeExceptionally(errors.exception());
                return;
            }
            if (errors == Errors.UNKNOWN_MEMBER_ID) {
                CommitRequestManager.this.log.error("OffsetFetch failed with {} because the member is not part of the group anymore.", errors);
                this.future.completeExceptionally(errors.exception());
                return;
            }
            if (errors == Errors.STALE_MEMBER_EPOCH) {
                CommitRequestManager.this.log.error("OffsetFetch failed with {} and the consumer is not part of the group anymore (it probably left the group, got fenced or failed). The request cannot be retried and will fail.", errors);
                this.future.completeExceptionally(errors.exception());
            } else if (errors == Errors.NOT_COORDINATOR || errors == Errors.COORDINATOR_NOT_AVAILABLE) {
                CommitRequestManager.this.coordinatorRequestManager.markCoordinatorUnknown("error response " + errors.name(), j);
                this.future.completeExceptionally(errors.exception());
            } else if (errors == Errors.GROUP_AUTHORIZATION_FAILED) {
                this.future.completeExceptionally(GroupAuthorizationException.forGroupId(CommitRequestManager.this.groupId));
            } else {
                this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response: " + errors.message()));
            }
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        String requestDescription() {
            return "OffsetFetch request for partitions " + this.requestedPartitions;
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        CompletableFuture<?> future() {
            return this.future;
        }

        void resetFuture() {
            this.future = new CompletableFuture<>();
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState
        void removeRequest() {
            if (CommitRequestManager.this.unsentOffsetFetchRequests().remove(this)) {
                return;
            }
            CommitRequestManager.this.log.warn("OffsetFetch request to remove not found in the outbound buffer: {}", this);
        }

        private void onSuccess(long j, OffsetFetchResponse offsetFetchResponse) {
            HashSet hashSet = null;
            Map<TopicPartition, OffsetFetchResponse.PartitionData> partitionDataMap = offsetFetchResponse.partitionDataMap(CommitRequestManager.this.groupId);
            HashMap hashMap = new HashMap(partitionDataMap.size());
            HashSet hashSet2 = new HashSet();
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry : partitionDataMap.entrySet()) {
                TopicPartition key = entry.getKey();
                OffsetFetchResponse.PartitionData value = entry.getValue();
                if (value.hasError()) {
                    onFailedAttempt(j);
                    Errors errors = value.error;
                    CommitRequestManager.this.log.debug("Failed to fetch offset for partition {}: {}", key, errors.message());
                    if (errors == Errors.UNKNOWN_TOPIC_OR_PARTITION) {
                        this.future.completeExceptionally(new KafkaException("Topic or Partition " + key + " does not exist"));
                        return;
                    }
                    if (errors == Errors.TOPIC_AUTHORIZATION_FAILED) {
                        if (hashSet == null) {
                            hashSet = new HashSet();
                        }
                        hashSet.add(key.topic());
                    } else {
                        if (errors != Errors.UNSTABLE_OFFSET_COMMIT) {
                            this.future.completeExceptionally(new KafkaException("Unexpected error in fetch offset response for partition " + key + ": " + errors.message()));
                            return;
                        }
                        hashSet2.add(key);
                    }
                } else if (value.offset >= 0) {
                    hashMap.put(key, new OffsetAndMetadata(value.offset, value.leaderEpoch, value.metadata));
                } else {
                    CommitRequestManager.this.log.info("Found no committed offset for partition {}", key);
                    hashMap.put(key, null);
                }
            }
            if (hashSet != null) {
                this.future.completeExceptionally(new TopicAuthorizationException(hashSet));
            } else if (hashSet2.isEmpty()) {
                onSuccessfulAttempt(j);
                this.future.complete(hashMap);
            } else {
                CommitRequestManager.this.log.info("The following partitions still have unstable offsets which are not cleared on the broker side: {}, this could be either transactional offsets waiting for completion, or normal offsets waiting for replication after appending to local log", hashSet2);
                this.future.completeExceptionally(new UnstableOffsetCommitException("There are unstable offsets for the requested topic partitions"));
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void chainFuture(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> completableFuture) {
            this.future.whenComplete((map, th) -> {
                if (th != null) {
                    completableFuture.completeExceptionally(th);
                } else {
                    completableFuture.complete(map);
                }
            });
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.CommitRequestManager.RetriableRequestState, datahub.shaded.org.apache.kafka.clients.consumer.internals.TimedRequestState, datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public String toStringBase() {
            return super.toStringBase() + ", requestedPartitions=" + this.requestedPartitions;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CommitRequestManager$PendingRequests.class */
    public class PendingRequests {
        Queue<OffsetCommitRequestState> unsentOffsetCommits = new LinkedList();
        List<OffsetFetchRequestState> unsentOffsetFetches = new ArrayList();
        List<OffsetFetchRequestState> inflightOffsetFetches = new ArrayList();

        PendingRequests() {
        }

        boolean hasUnsentRequests() {
            return (this.unsentOffsetCommits.isEmpty() && this.unsentOffsetFetches.isEmpty()) ? false : true;
        }

        OffsetCommitRequestState addOffsetCommitRequest(OffsetCommitRequestState offsetCommitRequestState) {
            CommitRequestManager.this.log.debug("Enqueuing OffsetCommit request for offsets: {}", offsetCommitRequestState.offsets);
            this.unsentOffsetCommits.add(offsetCommitRequestState);
            return offsetCommitRequestState;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> addOffsetFetchRequest(OffsetFetchRequestState offsetFetchRequestState) {
            Optional<OffsetFetchRequestState> findAny = this.unsentOffsetFetches.stream().filter(offsetFetchRequestState2 -> {
                return offsetFetchRequestState2.sameRequest(offsetFetchRequestState);
            }).findAny();
            Optional<OffsetFetchRequestState> findAny2 = this.inflightOffsetFetches.stream().filter(offsetFetchRequestState3 -> {
                return offsetFetchRequestState3.sameRequest(offsetFetchRequestState);
            }).findAny();
            if (findAny.isPresent() || findAny2.isPresent()) {
                CommitRequestManager.this.log.debug("Duplicated unsent offset fetch request found for partitions: {}", offsetFetchRequestState.requestedPartitions);
                findAny2.getClass();
                findAny.orElseGet(findAny2::get).chainFuture(offsetFetchRequestState.future);
            } else {
                CommitRequestManager.this.log.debug("Enqueuing offset fetch request for partitions: {}", offsetFetchRequestState.requestedPartitions);
                this.unsentOffsetFetches.add(offsetFetchRequestState);
            }
            return offsetFetchRequestState.future;
        }

        List<NetworkClientDelegate.UnsentRequest> drain(long j) {
            List list = (List) this.unsentOffsetCommits.stream().filter(offsetCommitRequestState -> {
                return !offsetCommitRequestState.canSendRequest(j);
            }).collect(Collectors.toList());
            failAndRemoveExpiredCommitRequests();
            List list2 = (List) this.unsentOffsetCommits.stream().filter(offsetCommitRequestState2 -> {
                return offsetCommitRequestState2.canSendRequest(j);
            }).peek(offsetCommitRequestState3 -> {
                offsetCommitRequestState3.onSendAttempt(j);
            }).map((v0) -> {
                return v0.toUnsentRequest();
            }).collect(Collectors.toCollection(ArrayList::new));
            Map map = (Map) this.unsentOffsetFetches.stream().collect(Collectors.partitioningBy(offsetFetchRequestState -> {
                return offsetFetchRequestState.canSendRequest(j);
            }));
            failAndRemoveExpiredFetchRequests();
            for (OffsetFetchRequestState offsetFetchRequestState2 : (List) map.get(true)) {
                offsetFetchRequestState2.onSendAttempt(j);
                list2.add(offsetFetchRequestState2.toUnsentRequest());
                this.inflightOffsetFetches.add(offsetFetchRequestState2);
            }
            clearAll();
            this.unsentOffsetFetches.addAll((Collection) map.get(false));
            this.unsentOffsetCommits.addAll(list);
            return Collections.unmodifiableList(list2);
        }

        private void failAndRemoveExpiredCommitRequests() {
            new LinkedList(this.unsentOffsetCommits).forEach((v0) -> {
                v0.maybeExpire();
            });
        }

        private void failAndRemoveExpiredFetchRequests() {
            new LinkedList(this.unsentOffsetFetches).forEach((v0) -> {
                v0.maybeExpire();
            });
        }

        private void clearAll() {
            this.unsentOffsetCommits.clear();
            this.unsentOffsetFetches.clear();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public List<NetworkClientDelegate.UnsentRequest> drainPendingCommits() {
            List<NetworkClientDelegate.UnsentRequest> list = (List) this.unsentOffsetCommits.stream().map((v0) -> {
                return v0.toUnsentRequest();
            }).collect(Collectors.toCollection(ArrayList::new));
            clearAll();
            return list;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/CommitRequestManager$RetriableRequestState.class */
    public abstract class RetriableRequestState extends TimedRequestState {
        final MemberInfo memberInfo;

        RetriableRequestState(LogContext logContext, String str, long j, long j2, MemberInfo memberInfo, Timer timer) {
            super(logContext, str, j, j2, timer);
            this.memberInfo = memberInfo;
        }

        RetriableRequestState(LogContext logContext, String str, long j, int i, long j2, double d, MemberInfo memberInfo, Timer timer) {
            super(logContext, str, j, i, j2, d, timer);
            this.memberInfo = memberInfo;
        }

        abstract String requestDescription();

        abstract CompletableFuture<?> future();

        void maybeExpire() {
            if (this.numAttempts <= 0 || !isExpired()) {
                return;
            }
            removeRequest();
            future().completeExceptionally(new TimeoutException(requestDescription() + " could not complete before timeout expired."));
        }

        NetworkClientDelegate.UnsentRequest buildRequestWithResponseHandling(AbstractRequest.Builder<?> builder) {
            NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(builder, CommitRequestManager.this.coordinatorRequestManager.coordinator());
            unsentRequest.whenComplete((clientResponse, th) -> {
                handleClientResponse(clientResponse, th, unsentRequest.handler().completionTimeMs());
            });
            return unsentRequest;
        }

        private void handleClientResponse(ClientResponse clientResponse, Throwable th, long j) {
            try {
                if (th == null) {
                    onResponse(clientResponse);
                } else {
                    CommitRequestManager.this.log.debug("{} completed with error", requestDescription(), th);
                    onFailedAttempt(j);
                    CommitRequestManager.this.handleCoordinatorDisconnect(th, j);
                    future().completeExceptionally(th);
                }
            } catch (Throwable th2) {
                CommitRequestManager.this.log.error("Unexpected error handling response for {}", requestDescription(), th2);
                future().completeExceptionally(th2);
            }
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.TimedRequestState, datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public String toStringBase() {
            return super.toStringBase() + ", " + this.memberInfo;
        }

        abstract void onResponse(ClientResponse clientResponse);

        abstract void removeRequest();
    }

    public CommitRequestManager(Time time, LogContext logContext, SubscriptionState subscriptionState, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, String str, Optional<String> optional, Metrics metrics) {
        this(time, logContext, subscriptionState, consumerConfig, coordinatorRequestManager, offsetCommitCallbackInvoker, str, optional, consumerConfig.getLong("retry.backoff.ms").longValue(), consumerConfig.getLong("retry.backoff.max.ms").longValue(), OptionalDouble.empty(), metrics);
    }

    CommitRequestManager(Time time, LogContext logContext, SubscriptionState subscriptionState, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, OffsetCommitCallbackInvoker offsetCommitCallbackInvoker, String str, Optional<String> optional, long j, long j2, OptionalDouble optionalDouble, Metrics metrics) {
        this.closing = false;
        Objects.requireNonNull(coordinatorRequestManager, "Coordinator is needed upon committing offsets");
        this.time = time;
        this.logContext = logContext;
        this.log = logContext.logger(getClass());
        this.pendingRequests = new PendingRequests();
        if (consumerConfig.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).booleanValue()) {
            this.autoCommitState = Optional.of(new AutoCommitState(time, Integer.toUnsignedLong(consumerConfig.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG).intValue()), logContext));
        } else {
            this.autoCommitState = Optional.empty();
        }
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.groupId = str;
        this.groupInstanceId = optional;
        this.subscriptions = subscriptionState;
        this.retryBackoffMs = j;
        this.retryBackoffMaxMs = j2;
        this.jitter = optionalDouble;
        this.throwOnFetchStableOffsetUnsupported = consumerConfig.getBoolean("internal.throw.on.fetch.stable.offset.unsupported").booleanValue();
        this.memberInfo = new MemberInfo();
        this.metricsManager = new OffsetCommitMetricsManager(metrics);
        this.offsetCommitCallbackInvoker = offsetCommitCallbackInvoker;
        this.lastEpochSentOnCommit = Optional.empty();
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public NetworkClientDelegate.PollResult poll(long j) {
        if (!this.coordinatorRequestManager.coordinator().isPresent()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        if (this.closing) {
            return drainPendingOffsetCommitRequests();
        }
        maybeAutoCommitAsync();
        if (!this.pendingRequests.hasUnsentRequests()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        return new NetworkClientDelegate.PollResult(Math.min(findMinTime(unsentOffsetCommitRequests(), j), findMinTime(unsentOffsetFetchRequests(), j)), this.pendingRequests.drain(j));
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public void signalClose() {
        this.closing = true;
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public long maximumTimeToWait(long j) {
        return ((Long) this.autoCommitState.map(autoCommitState -> {
            return Long.valueOf(autoCommitState.remainingMs(j));
        }).orElse(Long.valueOf(NetworkClientDelegate.PollResult.WAIT_FOREVER))).longValue();
    }

    private static long findMinTime(Collection<? extends RequestState> collection, long j) {
        return collection.stream().mapToLong(requestState -> {
            return requestState.remainingBackoffMs(j);
        }).min().orElse(NetworkClientDelegate.PollResult.WAIT_FOREVER);
    }

    private KafkaException maybeWrapAsTimeoutException(Throwable th) {
        return th instanceof TimeoutException ? (TimeoutException) th : new TimeoutException(th);
    }

    private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit(OffsetCommitRequestState offsetCommitRequestState) {
        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> completableFuture;
        AutoCommitState autoCommitState = this.autoCommitState.get();
        if (offsetCommitRequestState.offsets.isEmpty()) {
            completableFuture = CompletableFuture.completedFuture(Collections.emptyMap());
        } else {
            autoCommitState.setInflightCommitStatus(true);
            OffsetCommitRequestState addOffsetCommitRequest = this.pendingRequests.addOffsetCommitRequest(offsetCommitRequestState);
            completableFuture = addOffsetCommitRequest.future;
            completableFuture.whenComplete(autoCommitCallback(addOffsetCommitRequest.offsets));
        }
        return completableFuture;
    }

    public void maybeAutoCommitAsync() {
        if (autoCommitEnabled() && this.autoCommitState.get().shouldAutoCommit()) {
            CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> requestAutoCommit = requestAutoCommit(createOffsetCommitRequest(this.subscriptions.allConsumed(), NetworkClientDelegate.PollResult.WAIT_FOREVER));
            resetAutoCommitTimer();
            maybeResetTimerWithBackoff(requestAutoCommit);
        }
    }

    private void maybeResetTimerWithBackoff(CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> completableFuture) {
        completableFuture.whenComplete((map, th) -> {
            if (th == null) {
                this.log.debug("Completed asynchronous auto-commit of offsets {}", map);
            } else if (!(th instanceof RetriableCommitFailedException)) {
                this.log.debug("Asynchronous auto-commit of offsets {} failed: {}", map, th.getMessage());
            } else {
                this.log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error.", map, th);
                resetAutoCommitTimer(this.retryBackoffMs);
            }
        });
    }

    public CompletableFuture<Void> maybeAutoCommitSyncBeforeRevocation(long j) {
        if (!autoCommitEnabled()) {
            return CompletableFuture.completedFuture(null);
        }
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        autoCommitSyncBeforeRevocationWithRetries(createOffsetCommitRequest(this.subscriptions.allConsumed(), j), completableFuture);
        return completableFuture;
    }

    private void autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState offsetCommitRequestState, CompletableFuture<Void> completableFuture) {
        requestAutoCommit(offsetCommitRequestState).whenComplete((map, th) -> {
            if (th == null) {
                completableFuture.complete(null);
                return;
            }
            if (!(th instanceof RetriableException) && !isStaleEpochErrorAndValidEpochAvailable(th)) {
                this.log.debug("Auto-commit sync before revocation failed with non-retriable error", th);
                completableFuture.completeExceptionally(th);
                return;
            }
            if (offsetCommitRequestState.isExpired()) {
                this.log.debug("Auto-commit sync before revocation timed out and won't be retried anymore");
                completableFuture.completeExceptionally(maybeWrapAsTimeoutException(th));
            } else if (th instanceof UnknownTopicOrPartitionException) {
                this.log.debug("Auto-commit sync before revocation failed because topic or partition were deleted");
                completableFuture.completeExceptionally(th);
            } else {
                this.log.debug("Member {} will retry auto-commit of latest offsets after receiving retriable error {}", this.memberInfo.memberId.orElse("undefined"), th.getMessage());
                offsetCommitRequestState.offsets = this.subscriptions.allConsumed();
                offsetCommitRequestState.resetFuture();
                autoCommitSyncBeforeRevocationWithRetries(offsetCommitRequestState, completableFuture);
            }
        });
    }

    private BiConsumer<? super Map<TopicPartition, OffsetAndMetadata>, ? super Throwable> autoCommitCallback(Map<TopicPartition, OffsetAndMetadata> map) {
        return (map2, th) -> {
            this.autoCommitState.ifPresent(autoCommitState -> {
                autoCommitState.setInflightCommitStatus(false);
            });
            if (th == null) {
                this.offsetCommitCallbackInvoker.enqueueInterceptorInvocation(map);
                this.log.debug("Completed auto-commit of offsets {}", map);
            } else if (th instanceof RetriableCommitFailedException) {
                this.log.debug("Auto-commit of offsets {} failed due to retriable error: {}", map, th.getMessage());
            } else {
                this.log.warn("Auto-commit of offsets {} failed", map, th);
            }
        };
    }

    public CompletableFuture<Void> commitAsync(Map<TopicPartition, OffsetAndMetadata> map) {
        if (map.isEmpty()) {
            this.log.debug("Skipping commit of empty offsets");
            return CompletableFuture.completedFuture(null);
        }
        OffsetCommitRequestState createOffsetCommitRequest = createOffsetCommitRequest(map, NetworkClientDelegate.PollResult.WAIT_FOREVER);
        this.pendingRequests.addOffsetCommitRequest(createOffsetCommitRequest);
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        createOffsetCommitRequest.future.whenComplete((map2, th) -> {
            if (th != null) {
                completableFuture.completeExceptionally(commitAsyncExceptionForError(th));
            } else {
                completableFuture.complete(null);
            }
        });
        return completableFuture;
    }

    public CompletableFuture<Void> commitSync(Map<TopicPartition, OffsetAndMetadata> map, long j) {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        commitSyncWithRetries(createOffsetCommitRequest(map, j), completableFuture);
        return completableFuture;
    }

    private OffsetCommitRequestState createOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> map, long j) {
        return this.jitter.isPresent() ? new OffsetCommitRequestState(map, this.groupId, this.groupInstanceId, j, this.retryBackoffMs, this.retryBackoffMaxMs, this.jitter.getAsDouble(), this.memberInfo) : new OffsetCommitRequestState(map, this.groupId, this.groupInstanceId, j, this.retryBackoffMs, this.retryBackoffMaxMs, this.memberInfo);
    }

    private void commitSyncWithRetries(OffsetCommitRequestState offsetCommitRequestState, CompletableFuture<Void> completableFuture) {
        this.pendingRequests.addOffsetCommitRequest(offsetCommitRequestState);
        offsetCommitRequestState.future.whenComplete((map, th) -> {
            if (th == null) {
                completableFuture.complete(null);
                return;
            }
            if (!(th instanceof RetriableException)) {
                completableFuture.completeExceptionally(commitSyncExceptionForError(th));
            } else if (offsetCommitRequestState.isExpired()) {
                this.log.info("OffsetCommit timeout expired so it won't be retried anymore");
                completableFuture.completeExceptionally(maybeWrapAsTimeoutException(th));
            } else {
                offsetCommitRequestState.resetFuture();
                commitSyncWithRetries(offsetCommitRequestState, completableFuture);
            }
        });
    }

    private Throwable commitSyncExceptionForError(Throwable th) {
        return th instanceof StaleMemberEpochException ? new CommitFailedException("OffsetCommit failed with stale member epoch." + Errors.STALE_MEMBER_EPOCH.message()) : th;
    }

    private Throwable commitAsyncExceptionForError(Throwable th) {
        return th instanceof RetriableException ? new RetriableCommitFailedException(th) : th;
    }

    public CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> fetchOffsets(Set<TopicPartition> set, long j) {
        if (set.isEmpty()) {
            return CompletableFuture.completedFuture(Collections.emptyMap());
        }
        CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> completableFuture = new CompletableFuture<>();
        fetchOffsetsWithRetries(createOffsetFetchRequest(set, j), completableFuture);
        return completableFuture;
    }

    OffsetFetchRequestState createOffsetFetchRequest(Set<TopicPartition> set, long j) {
        return this.jitter.isPresent() ? new OffsetFetchRequestState(set, this.retryBackoffMs, this.retryBackoffMaxMs, j, this.jitter.getAsDouble(), this.memberInfo) : new OffsetFetchRequestState(set, this.retryBackoffMs, this.retryBackoffMaxMs, j, this.memberInfo);
    }

    private void fetchOffsetsWithRetries(OffsetFetchRequestState offsetFetchRequestState, CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> completableFuture) {
        this.pendingRequests.addOffsetFetchRequest(offsetFetchRequestState).whenComplete((map, th) -> {
            if (!this.pendingRequests.inflightOffsetFetches.remove(offsetFetchRequestState)) {
                this.log.warn("A duplicated, inflight, request was identified, but unable to find it in the outbound buffer:" + offsetFetchRequestState);
            }
            if (th == null) {
                completableFuture.complete(map);
                return;
            }
            if (!(th instanceof RetriableException) && !isStaleEpochErrorAndValidEpochAvailable(th)) {
                completableFuture.completeExceptionally(th);
            } else if (offsetFetchRequestState.isExpired()) {
                this.log.debug("OffsetFetch request for {} timed out and won't be retried anymore", offsetFetchRequestState.requestedPartitions);
                completableFuture.completeExceptionally(maybeWrapAsTimeoutException(th));
            } else {
                offsetFetchRequestState.resetFuture();
                fetchOffsetsWithRetries(offsetFetchRequestState, completableFuture);
            }
        });
    }

    private boolean isStaleEpochErrorAndValidEpochAvailable(Throwable th) {
        return (th instanceof StaleMemberEpochException) && this.memberInfo.memberEpoch.isPresent();
    }

    public void updateAutoCommitTimer(long j) {
        this.autoCommitState.ifPresent(autoCommitState -> {
            autoCommitState.updateTimer(j);
        });
    }

    Queue<OffsetCommitRequestState> unsentOffsetCommitRequests() {
        return this.pendingRequests.unsentOffsetCommits;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<OffsetFetchRequestState> unsentOffsetFetchRequests() {
        return this.pendingRequests.unsentOffsetFetches;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCoordinatorDisconnect(Throwable th, long j) {
        if (th instanceof DisconnectException) {
            this.coordinatorRequestManager.markCoordinatorUnknown(th.getMessage(), j);
        }
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.MemberStateListener
    public void onMemberEpochUpdated(Optional<Integer> optional, Optional<String> optional2) {
        if (!optional.isPresent() && this.memberInfo.memberEpoch.isPresent()) {
            this.log.info("Member {} won't include member id and epoch in following offset commit/fetch requests because it has left the group.", this.memberInfo.memberId.orElse(ClientInformation.UNKNOWN_NAME_OR_VERSION));
        }
        this.memberInfo.memberId = optional2;
        this.memberInfo.memberEpoch = optional;
    }

    public boolean autoCommitEnabled() {
        return this.autoCommitState.isPresent();
    }

    public void resetAutoCommitTimer() {
        this.autoCommitState.ifPresent((v0) -> {
            v0.resetTimer();
        });
    }

    public void resetAutoCommitTimer(long j) {
        this.autoCommitState.ifPresent(autoCommitState -> {
            autoCommitState.resetTimer(j);
        });
    }

    public NetworkClientDelegate.PollResult drainPendingOffsetCommitRequests() {
        return this.pendingRequests.unsentOffsetCommits.isEmpty() ? NetworkClientDelegate.PollResult.EMPTY : new NetworkClientDelegate.PollResult(NetworkClientDelegate.PollResult.WAIT_FOREVER, this.pendingRequests.drainPendingCommits());
    }

    Optional<Integer> lastEpochSentOnCommit() {
        return this.lastEpochSentOnCommit;
    }
}
