package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import datahub.shaded.org.apache.kafka.common.errors.GroupAuthorizationException;
import datahub.shaded.org.apache.kafka.common.errors.RetriableException;
import datahub.shaded.org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import datahub.shaded.org.apache.kafka.common.metrics.Metrics;
import datahub.shaded.org.apache.kafka.common.protocol.Errors;
import datahub.shaded.org.apache.kafka.common.requests.ShareGroupHeartbeatRequest;
import datahub.shaded.org.apache.kafka.common.requests.ShareGroupHeartbeatResponse;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.org.apache.kafka.common.utils.Time;
import datahub.shaded.org.apache.kafka.common.utils.Timer;
import datahub.shaded.org.slf4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.TreeSet;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager.class */
public class ShareHeartbeatRequestManager implements RequestManager {
    private final Logger logger;
    private final int maxPollIntervalMs;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final HeartbeatRequestState heartbeatRequestState;
    private final HeartbeatState heartbeatState;
    private final ShareMembershipManager shareMembershipManager;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Timer pollTimer;
    private final HeartbeatMetricsManager metricsManager;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager$HeartbeatRequestState.class */
    public static class HeartbeatRequestState extends RequestState {
        private final Timer heartbeatTimer;
        private long heartbeatIntervalMs;

        public HeartbeatRequestState(LogContext logContext, Time time, long j, long j2, long j3, double d) {
            super(logContext, HeartbeatRequestState.class.getName(), j2, 2, j3, d);
            this.heartbeatIntervalMs = j;
            this.heartbeatTimer = time.timer(j);
        }

        private void update(long j) {
            this.heartbeatTimer.update(j);
        }

        public void resetTimer() {
            this.heartbeatTimer.reset(this.heartbeatIntervalMs);
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public String toStringBase() {
            return super.toStringBase() + ", heartbeatTimer=" + this.heartbeatTimer + ", heartbeatIntervalMs=" + this.heartbeatIntervalMs;
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public boolean canSendRequest(long j) {
            update(j);
            return this.heartbeatTimer.isExpired() && super.canSendRequest(j);
        }

        public long timeToNextHeartbeatMs(long j) {
            return this.heartbeatTimer.isExpired() ? remainingBackoffMs(j) : this.heartbeatTimer.remainingMs();
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public void onFailedAttempt(long j) {
            this.heartbeatTimer.reset(0L);
            super.onFailedAttempt(j);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateHeartbeatIntervalMs(long j) {
            if (this.heartbeatIntervalMs == j) {
                return;
            }
            this.heartbeatIntervalMs = j;
            this.heartbeatTimer.updateAndReset(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager$HeartbeatState.class */
    public static class HeartbeatState {
        private final SubscriptionState subscriptions;
        private final ShareMembershipManager shareMembershipManager;
        private final SentFields sentFields = new SentFields();

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/ShareHeartbeatRequestManager$HeartbeatState$SentFields.class */
        public static class SentFields {
            private String rackId = null;
            private TreeSet<String> subscribedTopicNames = null;

            SentFields() {
            }

            void reset() {
                this.rackId = null;
                this.subscribedTopicNames = null;
            }
        }

        public HeartbeatState(SubscriptionState subscriptionState, ShareMembershipManager shareMembershipManager) {
            this.subscriptions = subscriptionState;
            this.shareMembershipManager = shareMembershipManager;
        }

        public void reset() {
            this.sentFields.reset();
        }

        public ShareGroupHeartbeatRequestData buildRequestData() {
            ShareGroupHeartbeatRequestData shareGroupHeartbeatRequestData = new ShareGroupHeartbeatRequestData();
            shareGroupHeartbeatRequestData.setGroupId(this.shareMembershipManager.groupId());
            shareGroupHeartbeatRequestData.setMemberId(this.shareMembershipManager.memberId());
            shareGroupHeartbeatRequestData.setMemberEpoch(this.shareMembershipManager.memberEpoch());
            if (this.sentFields.rackId == null) {
                shareGroupHeartbeatRequestData.setRackId(this.shareMembershipManager.rackId());
                this.sentFields.rackId = this.shareMembershipManager.rackId();
            }
            boolean z = this.shareMembershipManager.state() == MemberState.JOINING;
            TreeSet treeSet = new TreeSet(this.subscriptions.subscription());
            if (z || !treeSet.equals(this.sentFields.subscribedTopicNames)) {
                shareGroupHeartbeatRequestData.setSubscribedTopicNames(new ArrayList(this.subscriptions.subscription()));
                this.sentFields.subscribedTopicNames = treeSet;
            }
            return shareGroupHeartbeatRequestData;
        }
    }

    public ShareHeartbeatRequestManager(LogContext logContext, Time time, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, SubscriptionState subscriptionState, ShareMembershipManager shareMembershipManager, BackgroundEventHandler backgroundEventHandler, Metrics metrics) {
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.logger = logContext.logger(getClass());
        this.shareMembershipManager = shareMembershipManager;
        this.backgroundEventHandler = backgroundEventHandler;
        this.maxPollIntervalMs = consumerConfig.getInt("max.poll.interval.ms").intValue();
        long longValue = consumerConfig.getLong("retry.backoff.ms").longValue();
        long longValue2 = consumerConfig.getLong("retry.backoff.max.ms").longValue();
        this.heartbeatState = new HeartbeatState(subscriptionState, shareMembershipManager);
        this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0L, longValue, longValue2, this.maxPollIntervalMs);
        this.pollTimer = time.timer(this.maxPollIntervalMs);
        this.metricsManager = new HeartbeatMetricsManager(metrics, ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX);
    }

    ShareHeartbeatRequestManager(LogContext logContext, Timer timer, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, ShareMembershipManager shareMembershipManager, HeartbeatState heartbeatState, HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler, Metrics metrics) {
        this.logger = logContext.logger(getClass());
        this.maxPollIntervalMs = consumerConfig.getInt("max.poll.interval.ms").intValue();
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.heartbeatRequestState = heartbeatRequestState;
        this.heartbeatState = heartbeatState;
        this.shareMembershipManager = shareMembershipManager;
        this.backgroundEventHandler = backgroundEventHandler;
        this.pollTimer = timer;
        this.metricsManager = new HeartbeatMetricsManager(metrics, ConsumerUtils.CONSUMER_SHARE_METRIC_GROUP_PREFIX);
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public NetworkClientDelegate.PollResult poll(long j) {
        if (!this.coordinatorRequestManager.coordinator().isPresent() || this.shareMembershipManager.shouldSkipHeartbeat() || this.pollTimer.isExpired()) {
            this.shareMembershipManager.onHeartbeatRequestSkipped();
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        this.pollTimer.update(j);
        if (!this.pollTimer.isExpired() || this.shareMembershipManager.isLeavingGroup()) {
            boolean z = this.shareMembershipManager.shouldHeartbeatNow() && !this.heartbeatRequestState.requestInFlight();
            if (this.heartbeatRequestState.canSendRequest(j) || z) {
                return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(makeHeartbeatRequest(j, false)));
            }
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.timeToNextHeartbeatMs(j));
        }
        this.logger.warn("Share consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.");
        this.shareMembershipManager.transitionToSendingLeaveGroup(true);
        NetworkClientDelegate.UnsentRequest makeHeartbeatRequest = makeHeartbeatRequest(j, true);
        this.heartbeatRequestState.reset();
        this.heartbeatState.reset();
        return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(makeHeartbeatRequest));
    }

    public ShareMembershipManager membershipManager() {
        return this.shareMembershipManager;
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public long maximumTimeToWait(long j) {
        this.pollTimer.update(j);
        if (this.pollTimer.isExpired()) {
            return 0L;
        }
        if (!this.shareMembershipManager.shouldHeartbeatNow() || this.heartbeatRequestState.requestInFlight()) {
            return Math.min(this.pollTimer.remainingMs() / 2, this.heartbeatRequestState.timeToNextHeartbeatMs(j));
        }
        return 0L;
    }

    public void resetPollTimer(long j) {
        if (this.pollTimer.isExpired()) {
            this.logger.debug("Poll timer has been reset after it had expired");
            this.shareMembershipManager.maybeRejoinStaleMember();
        }
        this.pollTimer.update(j);
        this.pollTimer.reset(this.maxPollIntervalMs);
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(long j, boolean z) {
        NetworkClientDelegate.UnsentRequest makeHeartbeatRequest = makeHeartbeatRequest(z);
        this.heartbeatRequestState.onSendAttempt(j);
        this.shareMembershipManager.onHeartbeatRequestSent();
        this.metricsManager.recordHeartbeatSentMs(j);
        this.heartbeatRequestState.resetTimer();
        return makeHeartbeatRequest;
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(boolean z) {
        NetworkClientDelegate.UnsentRequest unsentRequest = new NetworkClientDelegate.UnsentRequest(new ShareGroupHeartbeatRequest.Builder(this.heartbeatState.buildRequestData()), this.coordinatorRequestManager.coordinator());
        return z ? logResponse(unsentRequest) : unsentRequest.whenComplete((clientResponse, th) -> {
            long completionTimeMs = unsentRequest.handler().completionTimeMs();
            if (clientResponse == null) {
                onFailure(th, completionTimeMs);
            } else {
                this.metricsManager.recordRequestLatency(clientResponse.requestLatencyMs());
                onResponse((ShareGroupHeartbeatResponse) clientResponse.responseBody(), completionTimeMs);
            }
        });
    }

    private NetworkClientDelegate.UnsentRequest logResponse(NetworkClientDelegate.UnsentRequest unsentRequest) {
        return unsentRequest.whenComplete((clientResponse, th) -> {
            if (clientResponse == null) {
                this.logger.error("ShareGroupHeartbeat failed because of unexpected exception.", th);
                return;
            }
            this.metricsManager.recordRequestLatency(clientResponse.requestLatencyMs());
            Errors forCode = Errors.forCode(((ShareGroupHeartbeatResponse) clientResponse.responseBody()).data().errorCode());
            if (forCode == Errors.NONE) {
                this.logger.debug("ShareGroupHeartbeat responded successfully: {}", clientResponse);
            } else {
                this.logger.error("ShareGroupHeartbeat failed because of {}: {}", forCode, clientResponse);
            }
        });
    }

    private void onFailure(Throwable th, long j) {
        this.heartbeatRequestState.onFailedAttempt(j);
        this.heartbeatState.reset();
        if (th instanceof RetriableException) {
            this.logger.debug(String.format("ShareGroupHeartbeatRequest failed because of retriable exception. Will retry in %s ms: %s", Long.valueOf(this.heartbeatRequestState.remainingBackoffMs(j)), th.getMessage()));
        } else {
            this.logger.error("ShareGroupHeartbeatRequest failed due to fatal error: {}", th.getMessage());
            handleFatalFailure(th);
        }
    }

    private void onResponse(ShareGroupHeartbeatResponse shareGroupHeartbeatResponse, long j) {
        if (Errors.forCode(shareGroupHeartbeatResponse.data().errorCode()) != Errors.NONE) {
            onErrorResponse(shareGroupHeartbeatResponse, j);
            return;
        }
        this.heartbeatRequestState.updateHeartbeatIntervalMs(shareGroupHeartbeatResponse.data().heartbeatIntervalMs());
        this.heartbeatRequestState.onSuccessfulAttempt(j);
        this.shareMembershipManager.onHeartbeatSuccess(shareGroupHeartbeatResponse.data());
    }

    private void onErrorResponse(ShareGroupHeartbeatResponse shareGroupHeartbeatResponse, long j) {
        Errors forCode = Errors.forCode(shareGroupHeartbeatResponse.data().errorCode());
        String errorMessage = shareGroupHeartbeatResponse.data().errorMessage();
        this.heartbeatState.reset();
        this.heartbeatRequestState.onFailedAttempt(j);
        switch (forCode) {
            case NOT_COORDINATOR:
                logInfo(String.format("ShareGroupHeartbeatRequest failed because the group coordinator %s is incorrect. Will attempt to find the coordinator again and retry", this.coordinatorRequestManager.coordinator()), shareGroupHeartbeatResponse, j);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, j);
                this.heartbeatRequestState.reset();
                return;
            case COORDINATOR_NOT_AVAILABLE:
                logInfo(String.format("ShareGroupHeartbeatRequest failed because the group coordinator %s is not available. Will attempt to find the coordinator again and retry", this.coordinatorRequestManager.coordinator()), shareGroupHeartbeatResponse, j);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessage, j);
                this.heartbeatRequestState.reset();
                return;
            case COORDINATOR_LOAD_IN_PROGRESS:
                logInfo(String.format("ShareGroupHeartbeatRequest failed because the group coordinator %s is still loading.Will retry", this.coordinatorRequestManager.coordinator()), shareGroupHeartbeatResponse, j);
                return;
            case GROUP_AUTHORIZATION_FAILED:
                GroupAuthorizationException forGroupId = GroupAuthorizationException.forGroupId(this.shareMembershipManager.groupId());
                this.logger.error("ShareGroupHeartbeatRequest failed due to group authorization failure: {}", forGroupId.getMessage());
                handleFatalFailure(forCode.exception(forGroupId.getMessage()));
                return;
            case INVALID_REQUEST:
            case GROUP_MAX_SIZE_REACHED:
            case UNSUPPORTED_VERSION:
                this.logger.error("ShareGroupHeartbeatRequest failed due to {}: {}", forCode, errorMessage);
                handleFatalFailure(forCode.exception(errorMessage));
                return;
            case UNKNOWN_MEMBER_ID:
                logInfo(String.format("ShareGroupHeartbeatRequest failed because member %s is unknown.", this.shareMembershipManager.memberId()), shareGroupHeartbeatResponse, j);
                this.shareMembershipManager.transitionToFenced();
                this.heartbeatRequestState.reset();
                return;
            default:
                this.logger.error("ShareGroupHeartbeatRequest failed due to unexpected error {}: {}", forCode, errorMessage);
                handleFatalFailure(forCode.exception(errorMessage));
                return;
        }
    }

    private void logInfo(String str, ShareGroupHeartbeatResponse shareGroupHeartbeatResponse, long j) {
        this.logger.info("{} in {}ms: {}", str, Long.valueOf(this.heartbeatRequestState.remainingBackoffMs(j)), shareGroupHeartbeatResponse.data().errorMessage());
    }

    private void handleFatalFailure(Throwable th) {
        this.backgroundEventHandler.add(new ErrorEvent(th));
        this.shareMembershipManager.transitionToFatal();
    }
}
