/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
import org.apache.kafka.clients.consumer.internals.ConsumerInterceptors;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.clients.consumer.internals.DelayedTask;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.consumer.internals.RequestFuture;
import org.apache.kafka.clients.consumer.internals.RequestFutureListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.internals.TopicConstants;
import org.apache.kafka.common.metrics.Measurable;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Count;
import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchRequest;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class ConsumerCoordinator
extends AbstractCoordinator {
    private static final Logger log = LoggerFactory.getLogger(ConsumerCoordinator.class);
    private final List<PartitionAssignor> assignors;
    private final Metadata metadata;
    private final ConsumerCoordinatorMetrics sensors;
    private final SubscriptionState subscriptions;
    private final OffsetCommitCallback defaultOffsetCommitCallback;
    private final boolean autoCommitEnabled;
    private final AutoCommitTask autoCommitTask;
    private final ConsumerInterceptors<?, ?> interceptors;
    private final boolean excludeInternalTopics;
    private MetadataSnapshot metadataSnapshot;
    private MetadataSnapshot assignmentSnapshot;

    public ConsumerCoordinator(ConsumerNetworkClient client, String groupId, int sessionTimeoutMs, int heartbeatIntervalMs, List<PartitionAssignor> assignors, Metadata metadata, SubscriptionState subscriptions, Metrics metrics, String metricGrpPrefix, Time time, long retryBackoffMs, OffsetCommitCallback defaultOffsetCommitCallback, boolean autoCommitEnabled, long autoCommitIntervalMs, ConsumerInterceptors<?, ?> interceptors, boolean excludeInternalTopics) {
        super(client, groupId, sessionTimeoutMs, heartbeatIntervalMs, metrics, metricGrpPrefix, time, retryBackoffMs);
        this.metadata = metadata;
        this.metadata.requestUpdate();
        this.metadataSnapshot = new MetadataSnapshot(subscriptions, metadata.fetch());
        this.subscriptions = subscriptions;
        this.defaultOffsetCommitCallback = defaultOffsetCommitCallback;
        this.autoCommitEnabled = autoCommitEnabled;
        this.assignors = assignors;
        this.addMetadataListener();
        if (autoCommitEnabled) {
            this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
            this.autoCommitTask.reschedule();
        } else {
            this.autoCommitTask = null;
        }
        this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
        this.interceptors = interceptors;
        this.excludeInternalTopics = excludeInternalTopics;
    }

    @Override
    public String protocolType() {
        return "consumer";
    }

    @Override
    public List<JoinGroupRequest.ProtocolMetadata> metadata() {
        ArrayList<JoinGroupRequest.ProtocolMetadata> metadataList = new ArrayList<JoinGroupRequest.ProtocolMetadata>();
        for (PartitionAssignor assignor : this.assignors) {
            PartitionAssignor.Subscription subscription = assignor.subscription(this.subscriptions.subscription());
            ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
            metadataList.add(new JoinGroupRequest.ProtocolMetadata(assignor.name(), metadata));
        }
        return metadataList;
    }

    private void addMetadataListener() {
        this.metadata.addListener(new Metadata.Listener(){

            @Override
            public void onMetadataUpdate(Cluster cluster) {
                MetadataSnapshot snapshot;
                if (ConsumerCoordinator.this.subscriptions.hasPatternSubscription()) {
                    HashSet<String> unauthorizedTopics = new HashSet<String>();
                    for (String topic : cluster.unauthorizedTopics()) {
                        if (!ConsumerCoordinator.this.filterTopic(topic)) continue;
                        unauthorizedTopics.add(topic);
                    }
                    if (!unauthorizedTopics.isEmpty()) {
                        throw new TopicAuthorizationException(unauthorizedTopics);
                    }
                    ArrayList<String> topicsToSubscribe = new ArrayList<String>();
                    for (String topic : cluster.topics()) {
                        if (!ConsumerCoordinator.this.filterTopic(topic)) continue;
                        topicsToSubscribe.add(topic);
                    }
                    ConsumerCoordinator.this.subscriptions.changeSubscription(topicsToSubscribe);
                    ConsumerCoordinator.this.metadata.setTopics(ConsumerCoordinator.this.subscriptions.groupSubscription());
                } else if (!cluster.unauthorizedTopics().isEmpty()) {
                    throw new TopicAuthorizationException(new HashSet<String>(cluster.unauthorizedTopics()));
                }
                if (ConsumerCoordinator.this.subscriptions.partitionsAutoAssigned() && !(snapshot = new MetadataSnapshot(ConsumerCoordinator.this.subscriptions, cluster)).equals(ConsumerCoordinator.this.metadataSnapshot)) {
                    ConsumerCoordinator.this.metadataSnapshot = snapshot;
                    ConsumerCoordinator.this.subscriptions.needReassignment();
                }
            }
        });
    }

    private boolean filterTopic(String topic) {
        return this.subscriptions.getSubscribedPattern().matcher(topic).matches() && (!this.excludeInternalTopics || !TopicConstants.INTERNAL_TOPICS.contains(topic));
    }

    private PartitionAssignor lookupAssignor(String name) {
        for (PartitionAssignor assignor : this.assignors) {
            if (!assignor.name().equals(name)) continue;
            return assignor;
        }
        return null;
    }

    @Override
    protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) {
        if (this.assignmentSnapshot != null && !this.assignmentSnapshot.equals(this.metadataSnapshot)) {
            this.subscriptions.needReassignment();
            return;
        }
        PartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
        if (assignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        PartitionAssignor.Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);
        this.subscriptions.needRefreshCommits();
        this.subscriptions.assignFromSubscribed(assignment.partitions());
        assignor.onAssignment(assignment);
        if (this.autoCommitEnabled) {
            this.autoCommitTask.reschedule();
        }
        ConsumerRebalanceListener listener = this.subscriptions.listener();
        log.info("Setting newly assigned partitions {} for group {}", this.subscriptions.assignedPartitions(), (Object)this.groupId);
        try {
            HashSet<TopicPartition> assigned = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
            listener.onPartitionsAssigned(assigned);
        }
        catch (WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("User provided listener {} for group {} failed on partition assignment", new Object[]{listener.getClass().getName(), this.groupId, e});
        }
    }

    @Override
    protected Map<String, ByteBuffer> performAssignment(String leaderId, String assignmentStrategy, Map<String, ByteBuffer> allSubscriptions) {
        PartitionAssignor assignor = this.lookupAssignor(assignmentStrategy);
        if (assignor == null) {
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        }
        HashSet<String> allSubscribedTopics = new HashSet<String>();
        HashMap<String, PartitionAssignor.Subscription> subscriptions = new HashMap<String, PartitionAssignor.Subscription>();
        for (Map.Entry<String, ByteBuffer> subscriptionEntry : allSubscriptions.entrySet()) {
            PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(subscriptionEntry.getValue());
            subscriptions.put(subscriptionEntry.getKey(), subscription);
            allSubscribedTopics.addAll(subscription.topics());
        }
        this.subscriptions.groupSubscribe(allSubscribedTopics);
        this.metadata.setTopics(this.subscriptions.groupSubscription());
        this.client.ensureFreshMetadata();
        this.assignmentSnapshot = this.metadataSnapshot;
        log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", new Object[]{this.groupId, assignor.name(), subscriptions});
        Map<String, PartitionAssignor.Assignment> assignment = assignor.assign(this.metadata.fetch(), subscriptions);
        log.debug("Finished assignment for group {}: {}", (Object)this.groupId, assignment);
        HashMap<String, ByteBuffer> groupAssignment = new HashMap<String, ByteBuffer>();
        for (Map.Entry<String, PartitionAssignor.Assignment> assignmentEntry : assignment.entrySet()) {
            ByteBuffer buffer = ConsumerProtocol.serializeAssignment(assignmentEntry.getValue());
            groupAssignment.put(assignmentEntry.getKey(), buffer);
        }
        return groupAssignment;
    }

    @Override
    protected void onJoinPrepare(int generation, String memberId) {
        this.maybeAutoCommitOffsetsSync();
        ConsumerRebalanceListener listener = this.subscriptions.listener();
        log.info("Revoking previously assigned partitions {} for group {}", this.subscriptions.assignedPartitions(), (Object)this.groupId);
        try {
            HashSet<TopicPartition> revoked = new HashSet<TopicPartition>(this.subscriptions.assignedPartitions());
            listener.onPartitionsRevoked(revoked);
        }
        catch (WakeupException e) {
            throw e;
        }
        catch (Exception e) {
            log.error("User provided listener {} for group {} failed on partition revocation", new Object[]{listener.getClass().getName(), this.groupId, e});
        }
        this.assignmentSnapshot = null;
        this.subscriptions.needReassignment();
    }

    @Override
    public boolean needRejoin() {
        return this.subscriptions.partitionsAutoAssigned() && (super.needRejoin() || this.subscriptions.partitionAssignmentNeeded());
    }

    public void refreshCommittedOffsetsIfNeeded() {
        if (this.subscriptions.refreshCommitsNeeded()) {
            Map<TopicPartition, OffsetAndMetadata> offsets = this.fetchCommittedOffsets(this.subscriptions.assignedPartitions());
            for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : offsets.entrySet()) {
                TopicPartition tp = entry2.getKey();
                if (!this.subscriptions.isAssigned(tp)) continue;
                this.subscriptions.committed(tp, entry2.getValue());
            }
            this.subscriptions.commitsRefreshed();
        }
    }

    public Map<TopicPartition, OffsetAndMetadata> fetchCommittedOffsets(Set<TopicPartition> partitions) {
        while (true) {
            this.ensureCoordinatorReady();
            RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future = this.sendOffsetFetchRequest(partitions);
            this.client.poll(future);
            if (future.succeeded()) {
                return future.value();
            }
            if (!future.isRetriable()) {
                throw future.exception();
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    public void ensurePartitionAssignment() {
        if (this.subscriptions.partitionsAutoAssigned()) {
            if (this.subscriptions.hasPatternSubscription()) {
                this.client.ensureFreshMetadata();
            }
            this.ensureActiveGroup();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        this.client.disableWakeups();
        try {
            this.maybeAutoCommitOffsetsSync();
        }
        finally {
            super.close();
        }
    }

    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
        this.subscriptions.needRefreshCommits();
        RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
        final OffsetCommitCallback cb = callback == null ? this.defaultOffsetCommitCallback : callback;
        future.addListener(new RequestFutureListener<Void>(){

            @Override
            public void onSuccess(Void value2) {
                if (ConsumerCoordinator.this.interceptors != null) {
                    ConsumerCoordinator.this.interceptors.onCommit(offsets);
                }
                cb.onComplete(offsets, null);
            }

            @Override
            public void onFailure(RuntimeException e) {
                if (e instanceof RetriableException) {
                    cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
                } else {
                    cb.onComplete(offsets, e);
                }
            }
        });
        this.client.pollNoWakeup();
    }

    public void commitOffsetsSync(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (offsets.isEmpty()) {
            return;
        }
        while (true) {
            this.ensureCoordinatorReady();
            RequestFuture<Void> future = this.sendOffsetCommitRequest(offsets);
            this.client.poll(future);
            if (future.succeeded()) {
                if (this.interceptors != null) {
                    this.interceptors.onCommit(offsets);
                }
                return;
            }
            if (!future.isRetriable()) {
                throw future.exception();
            }
            this.time.sleep(this.retryBackoffMs);
        }
    }

    private void maybeAutoCommitOffsetsSync() {
        if (this.autoCommitEnabled) {
            try {
                this.commitOffsetsSync(this.subscriptions.allConsumed());
            }
            catch (WakeupException e) {
                throw e;
            }
            catch (Exception e) {
                log.warn("Auto offset commit failed for group {}: {}", (Object)this.groupId, (Object)e.getMessage());
            }
        }
    }

    private RequestFuture<Void> sendOffsetCommitRequest(Map<TopicPartition, OffsetAndMetadata> offsets) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        if (offsets.isEmpty()) {
            return RequestFuture.voidSuccess();
        }
        HashMap<TopicPartition, OffsetCommitRequest.PartitionData> offsetData = new HashMap<TopicPartition, OffsetCommitRequest.PartitionData>(offsets.size());
        for (Map.Entry<TopicPartition, OffsetAndMetadata> entry2 : offsets.entrySet()) {
            OffsetAndMetadata offsetAndMetadata = entry2.getValue();
            offsetData.put(entry2.getKey(), new OffsetCommitRequest.PartitionData(offsetAndMetadata.offset(), offsetAndMetadata.metadata()));
        }
        OffsetCommitRequest req = new OffsetCommitRequest(this.groupId, this.generation, this.memberId, -1L, offsetData);
        log.trace("Sending offset-commit request with {} to coordinator {} for group {}", new Object[]{offsets, this.coordinator, this.groupId});
        return this.client.send(this.coordinator, ApiKeys.OFFSET_COMMIT, req).compose(new OffsetCommitResponseHandler(offsets));
    }

    private RequestFuture<Map<TopicPartition, OffsetAndMetadata>> sendOffsetFetchRequest(Set<TopicPartition> partitions) {
        if (this.coordinatorUnknown()) {
            return RequestFuture.coordinatorNotAvailable();
        }
        log.debug("Group {} fetching committed offsets for partitions: {}", (Object)this.groupId, partitions);
        OffsetFetchRequest request = new OffsetFetchRequest(this.groupId, new ArrayList<TopicPartition>(partitions));
        return this.client.send(this.coordinator, ApiKeys.OFFSET_FETCH, request).compose(new OffsetFetchResponseHandler());
    }

    private static class MetadataSnapshot {
        private final Map<String, Integer> partitionsPerTopic;

        public MetadataSnapshot(SubscriptionState subscription, Cluster cluster) {
            HashMap<String, Integer> partitionsPerTopic = new HashMap<String, Integer>();
            for (String topic : subscription.groupSubscription()) {
                partitionsPerTopic.put(topic, cluster.partitionCountForTopic(topic));
            }
            this.partitionsPerTopic = partitionsPerTopic;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            MetadataSnapshot that = (MetadataSnapshot)o;
            return this.partitionsPerTopic != null ? this.partitionsPerTopic.equals(that.partitionsPerTopic) : that.partitionsPerTopic == null;
        }

        public int hashCode() {
            return this.partitionsPerTopic != null ? this.partitionsPerTopic.hashCode() : 0;
        }
    }

    private class ConsumerCoordinatorMetrics {
        public final Metrics metrics;
        public final String metricGrpName;
        public final Sensor commitLatency;

        public ConsumerCoordinatorMetrics(Metrics metrics, String metricGrpPrefix) {
            this.metrics = metrics;
            this.metricGrpName = metricGrpPrefix + "-coordinator-metrics";
            this.commitLatency = metrics.sensor("commit-latency");
            this.commitLatency.add(metrics.metricName("commit-latency-avg", this.metricGrpName, "The average time taken for a commit request"), new Avg());
            this.commitLatency.add(metrics.metricName("commit-latency-max", this.metricGrpName, "The max time taken for a commit request"), new Max());
            this.commitLatency.add(metrics.metricName("commit-rate", this.metricGrpName, "The number of commit calls per second"), new Rate(new Count()));
            Measurable numParts = new Measurable(){

                @Override
                public double measure(MetricConfig config, long now) {
                    return ConsumerCoordinator.this.subscriptions.assignedPartitions().size();
                }
            };
            metrics.addMetric(metrics.metricName("assigned-partitions", this.metricGrpName, "The number of partitions currently assigned to this consumer"), numParts);
        }
    }

    private class OffsetFetchResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetFetchResponse, Map<TopicPartition, OffsetAndMetadata>> {
        private OffsetFetchResponseHandler() {
        }

        @Override
        public OffsetFetchResponse parse(ClientResponse response) {
            return new OffsetFetchResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetFetchResponse response, RequestFuture<Map<TopicPartition, OffsetAndMetadata>> future) {
            HashMap<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>(response.responseData().size());
            for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> entry2 : response.responseData().entrySet()) {
                TopicPartition tp = entry2.getKey();
                OffsetFetchResponse.PartitionData data = entry2.getValue();
                if (data.hasError()) {
                    Errors error = Errors.forCode(data.errorCode);
                    log.debug("Group {} failed to fetch offset for partition {}: {}", new Object[]{ConsumerCoordinator.this.groupId, tp, error.message()});
                    if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                        future.raise(error);
                    } else if (error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                        ConsumerCoordinator.this.coordinatorDead();
                        future.raise(error);
                    } else if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION) {
                        ConsumerCoordinator.this.subscriptions.needReassignment();
                        future.raise(error);
                    } else {
                        future.raise(new KafkaException("Unexpected error in fetch offset response: " + error.message()));
                    }
                    return;
                }
                if (data.offset >= 0L) {
                    offsets.put(tp, new OffsetAndMetadata(data.offset, data.metadata));
                    continue;
                }
                log.debug("Group {} has no committed offset for partition {}", (Object)ConsumerCoordinator.this.groupId, (Object)tp);
            }
            future.complete(offsets);
        }
    }

    private class OffsetCommitResponseHandler
    extends AbstractCoordinator.CoordinatorResponseHandler<OffsetCommitResponse, Void> {
        private final Map<TopicPartition, OffsetAndMetadata> offsets;

        public OffsetCommitResponseHandler(Map<TopicPartition, OffsetAndMetadata> offsets) {
            this.offsets = offsets;
        }

        @Override
        public OffsetCommitResponse parse(ClientResponse response) {
            return new OffsetCommitResponse(response.responseBody());
        }

        @Override
        public void handle(OffsetCommitResponse commitResponse, RequestFuture<Void> future) {
            ((ConsumerCoordinator)ConsumerCoordinator.this).sensors.commitLatency.record(this.response.requestLatencyMs());
            HashSet<String> unauthorizedTopics = new HashSet<String>();
            for (Map.Entry<TopicPartition, Short> entry2 : commitResponse.responseData().entrySet()) {
                TopicPartition tp = entry2.getKey();
                OffsetAndMetadata offsetAndMetadata = this.offsets.get(tp);
                long offset = offsetAndMetadata.offset();
                Errors error = Errors.forCode(entry2.getValue());
                if (error == Errors.NONE) {
                    log.debug("Group {} committed offset {} for partition {}", new Object[]{ConsumerCoordinator.this.groupId, offset, tp});
                    if (!ConsumerCoordinator.this.subscriptions.isAssigned(tp)) continue;
                    ConsumerCoordinator.this.subscriptions.committed(tp, offsetAndMetadata);
                    continue;
                }
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    log.error("Not authorized to commit offsets for group {}", (Object)ConsumerCoordinator.this.groupId);
                    future.raise(new GroupAuthorizationException(ConsumerCoordinator.this.groupId));
                    return;
                }
                if (error == Errors.TOPIC_AUTHORIZATION_FAILED) {
                    unauthorizedTopics.add(tp.topic());
                    continue;
                }
                if (error == Errors.OFFSET_METADATA_TOO_LARGE || error == Errors.INVALID_COMMIT_OFFSET_SIZE) {
                    log.debug("Offset commit for group {} failed on partition {}: {}", new Object[]{ConsumerCoordinator.this.groupId, tp, error.message()});
                    future.raise(error);
                    return;
                }
                if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                    log.debug("Offset commit for group {} failed: {}", (Object)ConsumerCoordinator.this.groupId, (Object)error.message());
                    future.raise(error);
                    return;
                }
                if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP || error == Errors.REQUEST_TIMED_OUT) {
                    log.debug("Offset commit for group {} failed: {}", (Object)ConsumerCoordinator.this.groupId, (Object)error.message());
                    ConsumerCoordinator.this.coordinatorDead();
                    future.raise(error);
                    return;
                }
                if (error == Errors.UNKNOWN_MEMBER_ID || error == Errors.ILLEGAL_GENERATION || error == Errors.REBALANCE_IN_PROGRESS) {
                    log.debug("Offset commit for group {} failed: {}", (Object)ConsumerCoordinator.this.groupId, (Object)error.message());
                    ConsumerCoordinator.this.subscriptions.needReassignment();
                    future.raise(new CommitFailedException("Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records."));
                    return;
                }
                log.error("Group {} failed to commit partition {} at offset {}: {}", new Object[]{ConsumerCoordinator.this.groupId, tp, offset, error.message()});
                future.raise(new KafkaException("Unexpected error in commit: " + error.message()));
                return;
            }
            if (!unauthorizedTopics.isEmpty()) {
                log.error("Not authorized to commit to topics {} for group {}", unauthorizedTopics, (Object)ConsumerCoordinator.this.groupId);
                future.raise(new TopicAuthorizationException(unauthorizedTopics));
            } else {
                future.complete(null);
            }
        }
    }

    public static class DefaultOffsetCommitCallback
    implements OffsetCommitCallback {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
            if (exception != null) {
                log.error("Offset commit failed.", (Throwable)exception);
            }
        }
    }

    private class AutoCommitTask
    implements DelayedTask {
        private final long interval;

        public AutoCommitTask(long interval) {
            this.interval = interval;
        }

        private void reschedule() {
            ConsumerCoordinator.this.client.schedule(this, ConsumerCoordinator.this.time.milliseconds() + this.interval);
        }

        private void reschedule(long at) {
            ConsumerCoordinator.this.client.schedule(this, at);
        }

        @Override
        public void run(final long now) {
            if (ConsumerCoordinator.this.coordinatorUnknown()) {
                log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", (Object)ConsumerCoordinator.this.groupId);
                this.reschedule(now + ConsumerCoordinator.this.retryBackoffMs);
                return;
            }
            if (ConsumerCoordinator.this.needRejoin()) {
                this.reschedule(now + this.interval);
                return;
            }
            ConsumerCoordinator.this.commitOffsetsAsync(ConsumerCoordinator.this.subscriptions.allConsumed(), new OffsetCommitCallback(){

                @Override
                public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                    if (exception == null) {
                        AutoCommitTask.this.reschedule(now + AutoCommitTask.this.interval);
                    } else {
                        log.warn("Auto offset commit failed for group {}: {}", (Object)ConsumerCoordinator.this.groupId, (Object)exception.getMessage());
                        AutoCommitTask.this.reschedule(now + AutoCommitTask.this.interval);
                    }
                }
            });
        }
    }
}

