package datahub.shaded.org.apache.kafka.clients.consumer.internals;

import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.BackgroundEventHandler;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackCompletedEvent;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.events.ConsumerRebalanceListenerCallbackNeededEvent;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.metrics.ConsumerRebalanceMetricsManager;
import datahub.shaded.org.apache.kafka.clients.consumer.internals.metrics.RebalanceMetricsManager;
import datahub.shaded.org.apache.kafka.common.KafkaException;
import datahub.shaded.org.apache.kafka.common.TopicPartition;
import datahub.shaded.org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import datahub.shaded.org.apache.kafka.common.metrics.Metrics;
import datahub.shaded.org.apache.kafka.common.protocol.Errors;
import datahub.shaded.org.apache.kafka.common.requests.ConsumerGroupHeartbeatResponse;
import datahub.shaded.org.apache.kafka.common.utils.LogContext;
import datahub.shaded.org.apache.kafka.common.utils.Time;
import java.util.HashMap;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

/* loaded from: input_file:datahub/shaded/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.class */
public class ConsumerMembershipManager extends AbstractMembershipManager<ConsumerGroupHeartbeatResponse> {
    protected final Optional<String> groupInstanceId;
    private final int rebalanceTimeoutMs;
    private final Optional<String> serverAssignor;
    private final CommitRequestManager commitRequestManager;
    private final BackgroundEventHandler backgroundEventHandler;

    public ConsumerMembershipManager(String str, Optional<String> optional, int i, Optional<String> optional2, SubscriptionState subscriptionState, CommitRequestManager commitRequestManager, ConsumerMetadata consumerMetadata, LogContext logContext, BackgroundEventHandler backgroundEventHandler, Time time, Metrics metrics, boolean z) {
        this(str, optional, i, optional2, subscriptionState, commitRequestManager, consumerMetadata, logContext, backgroundEventHandler, time, new ConsumerRebalanceMetricsManager(metrics), z);
    }

    ConsumerMembershipManager(String str, Optional<String> optional, int i, Optional<String> optional2, SubscriptionState subscriptionState, CommitRequestManager commitRequestManager, ConsumerMetadata consumerMetadata, LogContext logContext, BackgroundEventHandler backgroundEventHandler, Time time, RebalanceMetricsManager rebalanceMetricsManager, boolean z) {
        super(str, subscriptionState, consumerMetadata, logContext.logger(ConsumerMembershipManager.class), time, rebalanceMetricsManager, z);
        this.groupInstanceId = optional;
        this.rebalanceTimeoutMs = i;
        this.serverAssignor = optional2;
        this.commitRequestManager = commitRequestManager;
        this.backgroundEventHandler = backgroundEventHandler;
    }

    public Optional<String> groupInstanceId() {
        return this.groupInstanceId;
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    public void onHeartbeatSuccess(ConsumerGroupHeartbeatResponse consumerGroupHeartbeatResponse) {
        ConsumerGroupHeartbeatResponseData data = consumerGroupHeartbeatResponse.data();
        if (data.errorCode() != Errors.NONE.code()) {
            throw new IllegalArgumentException(String.format("Unexpected error in Heartbeat response. Expected no error, but received: %s", Errors.forCode(data.errorCode())));
        }
        MemberState state = state();
        if (state == MemberState.LEAVING) {
            this.log.debug("Ignoring heartbeat response received from broker. Member {} with epoch {} is already leaving the group.", this.memberId, Integer.valueOf(this.memberEpoch));
            return;
        }
        if (state == MemberState.UNSUBSCRIBED && maybeCompleteLeaveInProgress()) {
            this.log.debug("Member {} with epoch {} received a successful response to the heartbeat to leave the group and completed the leave operation. ", this.memberId, Integer.valueOf(this.memberEpoch));
            return;
        }
        if (isNotInGroup()) {
            this.log.debug("Ignoring heartbeat response received from broker. Member {} is in {} state so it's not a member of the group. ", this.memberId, state);
            return;
        }
        updateMemberEpoch(data.memberEpoch());
        ConsumerGroupHeartbeatResponseData.Assignment assignment = data.assignment();
        if (assignment != null) {
            if (!state.canHandleNewAssignment()) {
                this.log.debug("Ignoring new assignment {} received from server because member is in {} state.", assignment, state);
                return;
            }
            HashMap hashMap = new HashMap();
            assignment.topicPartitions().forEach(topicPartitions -> {
                hashMap.put(topicPartitions.topicId(), new TreeSet(topicPartitions.partitions()));
            });
            processAssignmentReceived(hashMap);
        }
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    protected CompletableFuture<Void> signalReconciliationStarted() {
        return this.commitRequestManager.maybeAutoCommitSyncBeforeRebalance(getDeadlineMsForTimeout(this.rebalanceTimeoutMs));
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    protected void signalReconciliationCompleting() {
        this.commitRequestManager.resetAutoCommitTimer();
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    protected CompletableFuture<Void> signalMemberLeavingGroup() {
        return invokeOnPartitionsRevokedOrLostToReleaseAssignment();
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    protected CompletableFuture<Void> signalPartitionsLost(Set<TopicPartition> set) {
        return invokeOnPartitionsLostCallback(set);
    }

    private CompletableFuture<Void> invokeOnPartitionsRevokedOrLostToReleaseAssignment() {
        TreeSet treeSet = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        treeSet.addAll(this.subscriptions.assignedPartitions());
        this.log.info("Member {} is triggering callbacks to release assignment {} and leave group", this.memberId, treeSet);
        return treeSet.isEmpty() ? CompletableFuture.completedFuture(null) : this.memberEpoch > 0 ? revokePartitions(treeSet) : invokeOnPartitionsLostCallback(treeSet);
    }

    public Optional<String> serverAssignor() {
        return this.serverAssignor;
    }

    private CompletableFuture<Void> invokeOnPartitionsRevokedCallback(Set<TopicPartition> set) {
        return (set.isEmpty() || !this.subscriptions.rebalanceListener().isPresent()) ? CompletableFuture.completedFuture(null) : enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_REVOKED, set);
    }

    private CompletableFuture<Void> invokeOnPartitionsAssignedCallback(Set<TopicPartition> set) {
        return this.subscriptions.rebalanceListener().isPresent() ? enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_ASSIGNED, set) : CompletableFuture.completedFuture(null);
    }

    private CompletableFuture<Void> invokeOnPartitionsLostCallback(Set<TopicPartition> set) {
        return (set.isEmpty() || !this.subscriptions.rebalanceListener().isPresent()) ? CompletableFuture.completedFuture(null) : enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName.ON_PARTITIONS_LOST, set);
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    public CompletableFuture<Void> signalPartitionsAssigned(Set<TopicPartition> set) {
        return invokeOnPartitionsAssignedCallback(set);
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    public void signalPartitionsBeingRevoked(Set<TopicPartition> set) {
        logPausedPartitionsBeingRevoked(set);
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    public CompletableFuture<Void> signalPartitionsRevoked(Set<TopicPartition> set) {
        return invokeOnPartitionsRevokedCallback(set);
    }

    private void logPausedPartitionsBeingRevoked(Set<TopicPartition> set) {
        Set<TopicPartition> pausedPartitions = this.subscriptions.pausedPartitions();
        pausedPartitions.retainAll(set);
        if (pausedPartitions.isEmpty()) {
            return;
        }
        this.log.info("The pause flag in partitions [{}] will be removed due to revocation.", pausedPartitions.stream().map((v0) -> {
            return v0.toString();
        }).collect(Collectors.joining(", ")));
    }

    private CompletableFuture<Void> enqueueConsumerRebalanceListenerCallback(ConsumerRebalanceListenerMethodName consumerRebalanceListenerMethodName, Set<TopicPartition> set) {
        TreeSet treeSet = new TreeSet(TOPIC_PARTITION_COMPARATOR);
        treeSet.addAll(set);
        ConsumerRebalanceListenerCallbackNeededEvent consumerRebalanceListenerCallbackNeededEvent = new ConsumerRebalanceListenerCallbackNeededEvent(consumerRebalanceListenerMethodName, treeSet);
        this.backgroundEventHandler.add(consumerRebalanceListenerCallbackNeededEvent);
        this.log.debug("The event to trigger the {} method execution was enqueued successfully", consumerRebalanceListenerMethodName.fullyQualifiedMethodName());
        return consumerRebalanceListenerCallbackNeededEvent.future();
    }

    public void consumerRebalanceListenerCallbackCompleted(ConsumerRebalanceListenerCallbackCompletedEvent consumerRebalanceListenerCallbackCompletedEvent) {
        ConsumerRebalanceListenerMethodName methodName = consumerRebalanceListenerCallbackCompletedEvent.methodName();
        Optional<KafkaException> error = consumerRebalanceListenerCallbackCompletedEvent.error();
        CompletableFuture<Void> future = consumerRebalanceListenerCallbackCompletedEvent.future();
        if (!error.isPresent()) {
            this.log.debug("The {} method completed successfully; signaling to continue to the next phase of rebalance", methodName.fullyQualifiedMethodName());
            future.complete(null);
        } else {
            KafkaException kafkaException = error.get();
            this.log.warn("The {} method completed with an error ({}); signaling to continue to the next phase of rebalance", methodName.fullyQualifiedMethodName(), kafkaException.getMessage());
            future.completeExceptionally(kafkaException);
        }
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    public int joinGroupEpoch() {
        return 0;
    }

    @Override // datahub.shaded.org.apache.kafka.clients.consumer.internals.AbstractMembershipManager
    public int leaveGroupEpoch() {
        return this.groupInstanceId.isPresent() ? -2 : -1;
    }
}
