package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.consumer.ConsumerConfig;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.ErrorEvent;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.metrics.HeartbeatMetricsManager;
import datahub.shaded.org.apache.kafka.common.errors.GroupAuthorizationException;
import datahub.shaded.org.apache.kafka.common.errors.RetriableException;
import datahub.shaded.org.apache.kafka.common.errors.UnsupportedVersionException;
import datahub.shaded.org.apache.kafka.common.protocol.ApiKeys;
import datahub.shaded.org.apache.kafka.common.protocol.Errors;
import datahub.shaded.org.apache.kafka.common.requests.AbstractResponse;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.org.apache.kafka.common.utils.Time;
import datahub.shaded.org.apache.kafka.common.utils.Timer;
import datahub.shaded.org.slf4j.Logger;
import java.util.Collections;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager.class */
public abstract class AbstractHeartbeatRequestManager<R extends AbstractResponse> implements RequestManager {
    protected final Logger logger;
    protected final int maxPollIntervalMs;
    protected final CoordinatorRequestManager coordinatorRequestManager;
    private final HeartbeatRequestState heartbeatRequestState;
    private final BackgroundEventHandler backgroundEventHandler;
    private final Timer pollTimer;
    private final HeartbeatMetricsManager metricsManager;
    public static final String CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG = "The cluster does not support the new CONSUMER group protocol. Set group.protocol=classic on the consumer configs to revert to the CLASSIC protocol until the cluster is upgraded.";

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/AbstractHeartbeatRequestManager$HeartbeatRequestState.class */
    public static class HeartbeatRequestState extends RequestState {
        private final Timer heartbeatTimer;
        private long heartbeatIntervalMs;

        HeartbeatRequestState(LogContext logContext, Time time, long j, long j2, long j3, double d) {
            super(logContext, HeartbeatRequestState.class.getName(), j2, 2, j3, d);
            this.heartbeatIntervalMs = j;
            this.heartbeatTimer = time.timer(j);
        }

        private void update(long j) {
            this.heartbeatTimer.update(j);
        }

        void resetTimer() {
            this.heartbeatTimer.reset(this.heartbeatIntervalMs);
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public String toStringBase() {
            String stringBase = super.toStringBase();
            long remainingMs = this.heartbeatTimer.remainingMs();
            long j = this.heartbeatIntervalMs;
            return stringBase + ", remainingMs=" + remainingMs + ", heartbeatIntervalMs=" + stringBase;
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public boolean canSendRequest(long j) {
            update(j);
            return this.heartbeatTimer.isExpired() && super.canSendRequest(j);
        }

        long timeToNextHeartbeatMs(long j) {
            return this.heartbeatTimer.isExpired() ? remainingBackoffMs(j) : this.heartbeatTimer.remainingMs();
        }

        @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestState
        public void onFailedAttempt(long j) {
            this.heartbeatTimer.reset(0L);
            super.onFailedAttempt(j);
        }

        private void updateHeartbeatIntervalMs(long j) {
            if (this.heartbeatIntervalMs == j) {
                return;
            }
            this.heartbeatIntervalMs = j;
            this.heartbeatTimer.updateAndReset(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractHeartbeatRequestManager(LogContext logContext, Time time, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, BackgroundEventHandler backgroundEventHandler, HeartbeatMetricsManager heartbeatMetricsManager) {
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.logger = logContext.logger(getClass());
        this.backgroundEventHandler = backgroundEventHandler;
        this.maxPollIntervalMs = consumerConfig.getInt("max.poll.interval.ms").intValue();
        this.heartbeatRequestState = new HeartbeatRequestState(logContext, time, 0L, consumerConfig.getLong("retry.backoff.ms").longValue(), consumerConfig.getLong("retry.backoff.max.ms").longValue(), this.maxPollIntervalMs);
        this.pollTimer = time.timer(this.maxPollIntervalMs);
        this.metricsManager = heartbeatMetricsManager;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractHeartbeatRequestManager(LogContext logContext, Timer timer, ConsumerConfig consumerConfig, CoordinatorRequestManager coordinatorRequestManager, HeartbeatRequestState heartbeatRequestState, BackgroundEventHandler backgroundEventHandler, HeartbeatMetricsManager heartbeatMetricsManager) {
        this.logger = logContext.logger(getClass());
        this.maxPollIntervalMs = consumerConfig.getInt("max.poll.interval.ms").intValue();
        this.coordinatorRequestManager = coordinatorRequestManager;
        this.heartbeatRequestState = heartbeatRequestState;
        this.backgroundEventHandler = backgroundEventHandler;
        this.pollTimer = timer;
        this.metricsManager = heartbeatMetricsManager;
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public NetworkClientDelegate.PollResult poll(long j) {
        if (this.coordinatorRequestManager.coordinator().isEmpty() || membershipManager().shouldSkipHeartbeat()) {
            membershipManager().onHeartbeatRequestSkipped();
            maybePropagateCoordinatorFatalErrorEvent();
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        this.pollTimer.update(j);
        if (!this.pollTimer.isExpired() || membershipManager().isLeavingGroup()) {
            boolean z = membershipManager().state() == MemberState.LEAVING || (membershipManager().shouldHeartbeatNow() && !this.heartbeatRequestState.requestInFlight());
            if (this.heartbeatRequestState.canSendRequest(j) || z) {
                return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(makeHeartbeatRequest(j, false)));
            }
            return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.timeToNextHeartbeatMs(j));
        }
        this.logger.warn("Consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.");
        membershipManager().transitionToSendingLeaveGroup(true);
        NetworkClientDelegate.UnsentRequest makeHeartbeatRequest = makeHeartbeatRequest(j, true);
        this.heartbeatRequestState.reset();
        resetHeartbeatState();
        return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(makeHeartbeatRequest));
    }

    public abstract AbstractMembershipManager<R> membershipManager();

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public NetworkClientDelegate.PollResult pollOnClose(long j) {
        if (!membershipManager().isLeavingGroup()) {
            return NetworkClientDelegate.PollResult.EMPTY;
        }
        return new NetworkClientDelegate.PollResult(this.heartbeatRequestState.heartbeatIntervalMs, Collections.singletonList(makeHeartbeatRequest(j, true)));
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.RequestManager
    public long maximumTimeToWait(long j) {
        this.pollTimer.update(j);
        if (this.pollTimer.isExpired()) {
            return 0L;
        }
        if (!membershipManager().shouldHeartbeatNow() || this.heartbeatRequestState.requestInFlight()) {
            return Math.min(this.pollTimer.remainingMs() / 2, this.heartbeatRequestState.timeToNextHeartbeatMs(j));
        }
        return 0L;
    }

    public void resetPollTimer(long j) {
        this.pollTimer.update(j);
        if (this.pollTimer.isExpired()) {
            this.logger.warn("Time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, exceeded approximately by {} ms. Member {} will rejoin the group now.", Long.valueOf(this.pollTimer.isExpiredBy()), membershipManager().memberId());
            membershipManager().maybeRejoinStaleMember();
        }
        this.pollTimer.reset(this.maxPollIntervalMs);
    }

    private void maybePropagateCoordinatorFatalErrorEvent() {
        this.coordinatorRequestManager.getAndClearFatalError().ifPresent(th -> {
            this.backgroundEventHandler.add(new ErrorEvent(th));
        });
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(long j, boolean z) {
        NetworkClientDelegate.UnsentRequest makeHeartbeatRequest = makeHeartbeatRequest(z);
        this.heartbeatRequestState.onSendAttempt(j);
        membershipManager().onHeartbeatRequestGenerated();
        this.metricsManager.recordHeartbeatSentMs(j);
        this.heartbeatRequestState.resetTimer();
        return makeHeartbeatRequest;
    }

    private NetworkClientDelegate.UnsentRequest makeHeartbeatRequest(boolean z) {
        NetworkClientDelegate.UnsentRequest buildHeartbeatRequest = buildHeartbeatRequest();
        return z ? logResponse(buildHeartbeatRequest) : buildHeartbeatRequest.whenComplete((clientResponse, th) -> {
            long completionTimeMs = buildHeartbeatRequest.handler().completionTimeMs();
            if (clientResponse == null) {
                onFailure(th, completionTimeMs);
            } else {
                this.metricsManager.recordRequestLatency(clientResponse.requestLatencyMs());
                onResponse(clientResponse.responseBody(), completionTimeMs);
            }
        });
    }

    private NetworkClientDelegate.UnsentRequest logResponse(NetworkClientDelegate.UnsentRequest unsentRequest) {
        return unsentRequest.whenComplete((clientResponse, th) -> {
            if (clientResponse == null) {
                this.logger.error("{} failed because of unexpected exception.", heartbeatRequestName(), th);
                return;
            }
            this.metricsManager.recordRequestLatency(clientResponse.requestLatencyMs());
            Errors errorForResponse = errorForResponse(clientResponse.responseBody());
            if (errorForResponse == Errors.NONE) {
                this.logger.debug("{} responded successfully: {}", heartbeatRequestName(), clientResponse);
            } else {
                this.logger.error("{} failed because of {}: {}", heartbeatRequestName(), errorForResponse, clientResponse);
            }
        });
    }

    private void onFailure(Throwable th, long j) {
        this.heartbeatRequestState.onFailedAttempt(j);
        resetHeartbeatState();
        if (th instanceof RetriableException) {
            this.coordinatorRequestManager.handleCoordinatorDisconnect(th, j);
            this.logger.debug(String.format("%s failed because of the retriable exception. Will retry in %s ms: %s", heartbeatRequestName(), Long.valueOf(this.heartbeatRequestState.remainingBackoffMs(j)), th.getMessage()));
        } else {
            this.logger.error("{} failed due to fatal error: {}", heartbeatRequestName(), th.getMessage());
            if (isHBApiUnsupportedErrorMsg(th)) {
                handleFatalFailure(new UnsupportedVersionException(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG, th));
            } else {
                handleFatalFailure(th);
            }
        }
        membershipManager().onHeartbeatFailure(th instanceof RetriableException);
    }

    private boolean isHBApiUnsupportedErrorMsg(Throwable th) {
        return (th instanceof UnsupportedVersionException) && th.getMessage().equals("The node does not support " + String.valueOf(ApiKeys.CONSUMER_GROUP_HEARTBEAT));
    }

    private void onResponse(R r, long j) {
        if (errorForResponse(r) != Errors.NONE) {
            onErrorResponse(r, j);
            return;
        }
        this.heartbeatRequestState.updateHeartbeatIntervalMs(heartbeatIntervalForResponse(r));
        this.heartbeatRequestState.onSuccessfulAttempt(j);
        membershipManager().onHeartbeatSuccess(r);
    }

    private void onErrorResponse(R r, long j) {
        Errors errorForResponse = errorForResponse(r);
        String errorMessageForResponse = errorMessageForResponse(r);
        resetHeartbeatState();
        this.heartbeatRequestState.onFailedAttempt(j);
        switch (errorForResponse) {
            case NOT_COORDINATOR:
                logInfo(String.format("%s failed because the group coordinator %s is incorrect. Will attempt to find the coordinator again and retry", heartbeatRequestName(), this.coordinatorRequestManager.coordinator()), r, j);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessageForResponse, j);
                this.heartbeatRequestState.reset();
                break;
            case COORDINATOR_NOT_AVAILABLE:
                logInfo(String.format("%s failed because the group coordinator %s is not available. Will attempt to find the coordinator again and retry", heartbeatRequestName(), this.coordinatorRequestManager.coordinator()), r, j);
                this.coordinatorRequestManager.markCoordinatorUnknown(errorMessageForResponse, j);
                this.heartbeatRequestState.reset();
                break;
            case COORDINATOR_LOAD_IN_PROGRESS:
                logInfo(String.format("%s failed because the group coordinator %s is still loading. Will retry", heartbeatRequestName(), this.coordinatorRequestManager.coordinator()), r, j);
                break;
            case GROUP_AUTHORIZATION_FAILED:
                GroupAuthorizationException forGroupId = GroupAuthorizationException.forGroupId(membershipManager().groupId());
                this.logger.error("{} failed due to group authorization failure: {}", heartbeatRequestName(), forGroupId.getMessage());
                handleFatalFailure(errorForResponse.exception(forGroupId.getMessage()));
                break;
            case TOPIC_AUTHORIZATION_FAILED:
                this.logger.error("{} failed for member {} with state {} due to {}: {}", heartbeatRequestName(), membershipManager().memberId, membershipManager().state, errorForResponse, errorMessageForResponse);
                this.backgroundEventHandler.add(new ErrorEvent(errorForResponse.exception()));
                break;
            case INVALID_REQUEST:
            case GROUP_MAX_SIZE_REACHED:
            case UNSUPPORTED_ASSIGNOR:
                this.logger.error("{} failed due to {}: {}", heartbeatRequestName(), errorForResponse, errorMessageForResponse);
                handleFatalFailure(errorForResponse.exception(errorMessageForResponse));
                break;
            case UNSUPPORTED_VERSION:
                this.logger.error("{} failed due to {}: {}", heartbeatRequestName(), errorForResponse, errorMessageForResponse);
                handleFatalFailure(errorForResponse.exception(CONSUMER_PROTOCOL_NOT_SUPPORTED_MSG));
                break;
            case FENCED_MEMBER_EPOCH:
                logInfo(String.format("%s failed for member %s because epoch %s is fenced.", heartbeatRequestName(), membershipManager().memberId(), Integer.valueOf(membershipManager().memberEpoch())), r, j);
                membershipManager().transitionToFenced();
                this.heartbeatRequestState.reset();
                break;
            case UNKNOWN_MEMBER_ID:
                logInfo(String.format("%s failed because member %s is unknown.", heartbeatRequestName(), membershipManager().memberId()), r, j);
                membershipManager().transitionToFenced();
                this.heartbeatRequestState.reset();
                break;
            case INVALID_REGULAR_EXPRESSION:
                this.logger.error("{} failed due to {}: {}", heartbeatRequestName(), errorForResponse, errorMessageForResponse);
                handleFatalFailure(errorForResponse.exception("Invalid RE2J SubscriptionPattern provided in the call to subscribe. " + errorMessageForResponse));
                break;
            default:
                if (!handleSpecificError(r, j)) {
                    this.logger.error("{} failed due to unexpected error {}: {}", heartbeatRequestName(), errorForResponse, errorMessageForResponse);
                    handleFatalFailure(errorForResponse.exception(errorMessageForResponse));
                    break;
                }
                break;
        }
        membershipManager().onHeartbeatFailure(false);
    }

    protected void logInfo(String str, R r, long j) {
        this.logger.info("{} in {}ms: {}", str, Long.valueOf(this.heartbeatRequestState.remainingBackoffMs(j)), errorMessageForResponse(r));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleFatalFailure(Throwable th) {
        this.backgroundEventHandler.add(new ErrorEvent(th));
        membershipManager().transitionToFatal();
    }

    public boolean handleSpecificError(R r, long j) {
        return false;
    }

    public abstract void resetHeartbeatState();

    public abstract NetworkClientDelegate.UnsentRequest buildHeartbeatRequest();

    public abstract String heartbeatRequestName();

    public abstract Errors errorForResponse(R r);

    public abstract String errorMessageForResponse(R r);

    public abstract long heartbeatIntervalForResponse(R r);
}
