package org.apache.kafka.clients.consumer.internals;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.consumer.RetriableCommitFailedException;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ApiException;
import org.apache.kafka.common.errors.DisconnectException;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.GroupCoordinatorResponse;
import org.apache.kafka.common.requests.HeartbeatResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.JoinGroupResponse;
import org.apache.kafka.common.requests.LeaveGroupRequest;
import org.apache.kafka.common.requests.LeaveGroupResponse;
import org.apache.kafka.common.requests.OffsetCommitRequest;
import org.apache.kafka.common.requests.OffsetCommitResponse;
import org.apache.kafka.common.requests.OffsetFetchResponse;
import org.apache.kafka.common.requests.SyncGroupRequest;
import org.apache.kafka.common.requests.SyncGroupResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.class */
public class ConsumerCoordinatorTest {
    private MockTime time;
    private MockClient client;
    private SubscriptionState subscriptions;
    private Metadata metadata;
    private Metrics metrics;
    private ConsumerNetworkClient consumerClient;
    private MockRebalanceListener rebalanceListener;
    private MockCommitCallback mockOffsetCommitCallback;
    private ConsumerCoordinator coordinator;
    private String topic1 = "test1";
    private String topic2 = "test2";
    private String groupId = "test-group";
    private TopicPartition t1p = new TopicPartition(this.topic1, 0);
    private TopicPartition t2p = new TopicPartition(this.topic2, 0);
    private int rebalanceTimeoutMs = 60000;
    private int sessionTimeoutMs = 10000;
    private int heartbeatIntervalMs = 5000;
    private long retryBackoffMs = 100;
    private boolean autoCommitEnabled = false;
    private int autoCommitIntervalMs = 2000;
    private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
    private List<PartitionAssignor> assignors = Collections.singletonList(this.partitionAssignor);
    private Cluster cluster = TestUtils.clusterWith(1, new HashMap<String, Integer>() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.1
        {
            put(ConsumerCoordinatorTest.this.topic1, 1);
            put(ConsumerCoordinatorTest.this.topic2, 1);
        }
    });
    private Node node = (Node) this.cluster.nodes().get(0);

    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest$MockCommitCallback.class */
    private static class MockCommitCallback implements OffsetCommitCallback {
        public int invoked;
        public Exception exception;

        private MockCommitCallback() {
            this.invoked = 0;
            this.exception = null;
        }

        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
            this.invoked++;
            this.exception = exc;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest$MockRebalanceListener.class */
    public static class MockRebalanceListener implements ConsumerRebalanceListener {
        public Collection<TopicPartition> revoked;
        public Collection<TopicPartition> assigned;
        public int revokedCount;
        public int assignedCount;

        private MockRebalanceListener() {
            this.revokedCount = 0;
            this.assignedCount = 0;
        }

        public void onPartitionsAssigned(Collection<TopicPartition> collection) {
            this.assigned = collection;
            this.assignedCount++;
        }

        public void onPartitionsRevoked(Collection<TopicPartition> collection) {
            this.revoked = collection;
            this.revokedCount++;
        }
    }

    @Before
    public void setup() {
        this.time = new MockTime();
        this.subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
        this.metadata = new Metadata(0L, Long.MAX_VALUE);
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client = new MockClient(this.time, this.metadata);
        this.consumerClient = new ConsumerNetworkClient(this.client, this.metadata, this.time, 100L, 1000L);
        this.metrics = new Metrics(this.time);
        this.rebalanceListener = new MockRebalanceListener();
        this.mockOffsetCommitCallback = new MockCommitCallback();
        this.partitionAssignor.clear();
        this.client.setNode(this.node);
        this.coordinator = buildCoordinator(this.metrics, this.assignors, true, this.autoCommitEnabled, true);
    }

    @After
    public void teardown() {
        this.metrics.close();
    }

    @Test
    public void testNormalHeartbeat() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.NONE.code()));
        this.consumerClient.poll(0L);
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.succeeded());
    }

    @Test(expected = GroupAuthorizationException.class)
    public void testGroupDescribeUnauthorized() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.GROUP_AUTHORIZATION_FAILED.code()));
        this.coordinator.ensureCoordinatorReady();
    }

    @Test(expected = GroupAuthorizationException.class)
    public void testGroupReadUnauthorized() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupLeaderResponse(0, "memberId", Collections.emptyMap(), Errors.GROUP_AUTHORIZATION_FAILED.code()));
        this.coordinator.poll(this.time.milliseconds());
    }

    @Test
    public void testCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.NOT_COORDINATOR_FOR_GROUP.code()));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.NOT_COORDINATOR_FOR_GROUP.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test
    public void testIllegalGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.ILLEGAL_GENERATION.code()));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.ILLEGAL_GENERATION.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.needRejoin());
    }

    @Test
    public void testUnknownConsumerId() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.subscriptions.assignFromSubscribed(Collections.singletonList(this.t1p));
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse(heartbeatResponse(Errors.UNKNOWN_MEMBER_ID.code()));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertEquals(Errors.UNKNOWN_MEMBER_ID.exception(), sendHeartbeatRequest.exception());
        Assert.assertTrue(this.coordinator.needRejoin());
    }

    @Test
    public void testCoordinatorDisconnect() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.time.sleep(this.sessionTimeoutMs);
        RequestFuture sendHeartbeatRequest = this.coordinator.sendHeartbeatRequest();
        Assert.assertEquals(1L, this.consumerClient.pendingRequestCount());
        Assert.assertFalse(sendHeartbeatRequest.isDone());
        this.client.prepareResponse((AbstractResponse) heartbeatResponse(Errors.NONE.code()), true);
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue(sendHeartbeatRequest.isDone());
        Assert.assertTrue(sendHeartbeatRequest.failed());
        Assert.assertTrue(sendHeartbeatRequest.exception() instanceof DisconnectException);
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
    }

    @Test(expected = ApiException.class)
    public void testJoinGroupInvalidGroupId() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupLeaderResponse(0, "leader", Collections.emptyMap(), Errors.INVALID_GROUP_ID.code()));
        this.coordinator.poll(this.time.milliseconds());
    }

    @Test
    public void testNormalJoinGroupLeader() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.2
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.memberId().equals("leader") && syncGroupRequest.generationId() == 1 && syncGroupRequest.groupAssignment().containsKey("leader");
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.poll(this.time.milliseconds());
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(Collections.singleton(this.topic1), this.subscriptions.groupSubscription());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testPatternJoinGroupLeader() {
        this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 1), Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Arrays.asList(this.t1p, this.t2p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.3
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.memberId().equals("leader") && syncGroupRequest.generationId() == 1 && syncGroupRequest.groupAssignment().containsKey("leader");
            }
        }, (AbstractResponse) syncGroupResponse(Arrays.asList(this.t1p, this.t2p), Errors.NONE.code()));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.coordinator.poll(this.time.milliseconds());
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(2L, this.subscriptions.assignedPartitions().size());
        Assert.assertEquals(2L, this.subscriptions.groupSubscription().size());
        Assert.assertEquals(2L, this.subscriptions.subscription().size());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(2L, this.rebalanceListener.assigned.size());
    }

    @Test
    public void testMetadataRefreshDuringRebalance() {
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        this.metadata.needMetadataForAllTopics(true);
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 1), Collections.emptySet(), this.time.milliseconds());
        Assert.assertEquals(Collections.singleton(this.topic1), this.subscriptions.subscription());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        final List asList = Arrays.asList(this.topic1, this.topic2);
        final HashSet hashSet = new HashSet(asList);
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.4
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                HashMap hashMap = new HashMap();
                Iterator it = asList.iterator();
                while (it.hasNext()) {
                    hashMap.put((String) it.next(), 1);
                }
                ConsumerCoordinatorTest.this.metadata.update(TestUtils.clusterWith(1, hashMap), Collections.emptySet(), ConsumerCoordinatorTest.this.time.milliseconds());
                return true;
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        List<TopicPartition> asList2 = Arrays.asList(this.t1p, this.t2p);
        HashSet hashSet2 = new HashSet(asList2);
        Map<String, List<String>> singletonMap2 = Collections.singletonMap("leader", Arrays.asList(this.topic1, this.topic2));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", asList2));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.5
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                JoinGroupRequest.ProtocolMetadata protocolMetadata = (JoinGroupRequest.ProtocolMetadata) ((JoinGroupRequest) abstractRequest).groupProtocols().iterator().next();
                PartitionAssignor.Subscription deserializeSubscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata());
                protocolMetadata.metadata().rewind();
                return deserializeSubscription.topics().containsAll(hashSet);
            }
        }, (AbstractResponse) joinGroupLeaderResponse(2, "leader", singletonMap2, Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(asList2, Errors.NONE.code()));
        this.coordinator.poll(this.time.milliseconds());
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(hashSet, this.subscriptions.subscription());
        Assert.assertEquals(hashSet2, this.subscriptions.assignedPartitions());
        Assert.assertEquals(2L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        Assert.assertEquals(2L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(hashSet2, this.rebalanceListener.assigned);
    }

    @Test
    public void testWakeupDuringJoin() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE.code()));
        this.consumerClient.wakeup();
        try {
            this.coordinator.poll(this.time.milliseconds());
        } catch (WakeupException e) {
        }
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.poll(this.time.milliseconds());
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testNormalJoinGroupFollower() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.6
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.memberId().equals("consumer") && syncGroupRequest.generationId() == 1 && syncGroupRequest.groupAssignment().isEmpty();
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(Collections.singleton(this.topic1), this.subscriptions.groupSubscription());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.revoked);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testPatternJoinGroupFollower() {
        this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 1), Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.7
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                return syncGroupRequest.memberId().equals("consumer") && syncGroupRequest.generationId() == 1 && syncGroupRequest.groupAssignment().isEmpty();
            }
        }, (AbstractResponse) syncGroupResponse(Arrays.asList(this.t1p, this.t2p), Errors.NONE.code()));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(2L, this.subscriptions.assignedPartitions().size());
        Assert.assertEquals(2L, this.subscriptions.subscription().size());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(2L, this.rebalanceListener.assigned.size());
    }

    @Test
    public void testLeaveGroupOnClose() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.8
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean.set(true);
                LeaveGroupRequest leaveGroupRequest = (LeaveGroupRequest) abstractRequest;
                return leaveGroupRequest.memberId().equals("consumer") && leaveGroupRequest.groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse) new LeaveGroupResponse(Errors.NONE.code()));
        this.coordinator.close(0L);
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testMaybeLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.9
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean.set(true);
                LeaveGroupRequest leaveGroupRequest = (LeaveGroupRequest) abstractRequest;
                return leaveGroupRequest.memberId().equals("consumer") && leaveGroupRequest.groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse) new LeaveGroupResponse(Errors.NONE.code()));
        this.coordinator.maybeLeaveGroup();
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertNull(this.coordinator.generation());
    }

    @Test(expected = KafkaException.class)
    public void testUnexpectedErrorOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN.code()));
        this.coordinator.joinGroupIfNeeded();
    }

    @Test
    public void testUnknownMemberIdOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.UNKNOWN_MEMBER_ID.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.10
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return ((JoinGroupRequest) abstractRequest).memberId().equals("");
            }
        }, (AbstractResponse) joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testRebalanceInProgressOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.REBALANCE_IN_PROGRESS.code()));
        this.client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testIllegalGenerationOnSyncGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.ILLEGAL_GENERATION.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.11
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                return ((JoinGroupRequest) abstractRequest).memberId().equals("");
            }
        }, (AbstractResponse) joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testMetadataChangeTriggersRebalance() {
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.metadata.update(this.cluster, Collections.emptySet(), this.time.milliseconds());
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.singletonMap("consumer", Collections.singletonList(this.t1p)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.poll(this.time.milliseconds());
        Assert.assertFalse(this.coordinator.needRejoin());
        this.metadata.update(TestUtils.singletonCluster(this.topic1, 2), Collections.emptySet(), this.time.milliseconds());
        Assert.assertTrue(this.coordinator.needRejoin());
    }

    @Test
    public void testUpdateMetadataDuringRebalance() {
        TopicPartition topicPartition = new TopicPartition("topic1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic2", 0);
        List asList = Arrays.asList("topic1", "topic2");
        this.subscriptions.subscribe(new HashSet(asList), this.rebalanceListener);
        this.metadata.setTopics(asList);
        this.metadata.update(TestUtils.singletonCluster("topic1", 1), Collections.emptySet(), this.time.milliseconds());
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> singletonMap = Collections.singletonMap("leader", asList);
        this.partitionAssignor.prepare(Collections.singletonMap("leader", Collections.singletonList(topicPartition)));
        this.client.prepareResponse(joinGroupLeaderResponse(1, "leader", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.12
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                SyncGroupRequest syncGroupRequest = (SyncGroupRequest) abstractRequest;
                if (!syncGroupRequest.memberId().equals("leader") || syncGroupRequest.generationId() != 1 || !syncGroupRequest.groupAssignment().containsKey("leader")) {
                    return false;
                }
                HashMap hashMap = new HashMap();
                hashMap.put("topic1", 1);
                hashMap.put("topic2", 1);
                ConsumerCoordinatorTest.this.metadata.update(TestUtils.singletonCluster(hashMap), Collections.emptySet(), ConsumerCoordinatorTest.this.time.milliseconds());
                return true;
            }
        }, (AbstractResponse) syncGroupResponse(Collections.singletonList(topicPartition), Errors.NONE.code()));
        this.client.prepareResponse(joinGroupLeaderResponse(2, "leader", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Arrays.asList(topicPartition, topicPartition2), Errors.NONE.code()));
        this.coordinator.poll(this.time.milliseconds());
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(new HashSet(Arrays.asList(topicPartition, topicPartition2)), this.subscriptions.assignedPartitions());
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithSubscribe() {
        unavailableTopicTest(false, false, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterTopicUnavailableWithPatternSubscribe() {
        unavailableTopicTest(true, false, Collections.emptySet());
    }

    @Test
    public void testRebalanceAfterNotMatchingTopicUnavailableWithPatternSSubscribe() {
        unavailableTopicTest(true, false, Collections.singleton("notmatching"));
    }

    @Test
    public void testAssignWithTopicUnavailable() {
        unavailableTopicTest(true, false, Collections.emptySet());
    }

    private void unavailableTopicTest(boolean z, boolean z2, Set<String> set) {
        this.metadata.setTopics(Collections.singletonList(this.topic1));
        this.client.prepareMetadataUpdate(Cluster.empty(), Collections.singleton("test1"));
        if (z2) {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        } else if (z) {
            this.subscriptions.subscribe(Pattern.compile("test.*"), this.rebalanceListener);
        } else {
            this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        }
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        Map<String, List<String>> singletonMap = Collections.singletonMap("consumer", Collections.singletonList(this.topic1));
        this.partitionAssignor.prepare(Collections.emptyMap());
        this.client.prepareResponse(joinGroupLeaderResponse(1, "consumer", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.emptyList(), Errors.NONE.code()));
        this.coordinator.poll(this.time.milliseconds());
        if (!z2) {
            Assert.assertFalse(this.coordinator.needRejoin());
            Assert.assertEquals(Collections.emptySet(), this.rebalanceListener.assigned);
        }
        Assert.assertTrue("Metadata refresh not requested for unavailable partitions", this.metadata.updateRequested());
        this.client.prepareMetadataUpdate(this.cluster, set);
        this.client.poll(0L, this.time.milliseconds());
        this.client.prepareResponse(joinGroupLeaderResponse(2, "consumer", singletonMap, Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.poll(this.time.milliseconds());
        Assert.assertFalse("Metadata refresh requested unnecessarily", this.metadata.updateRequested());
        if (z2) {
            return;
        }
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testExcludeInternalTopicsConfigOption() {
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        this.metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.emptySet(), this.time.milliseconds());
        Assert.assertFalse(this.subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
    }

    @Test
    public void testIncludeInternalTopicsConfigOption() {
        this.coordinator = buildCoordinator(new Metrics(), this.assignors, false, false, true);
        this.subscriptions.subscribe(Pattern.compile(".*"), this.rebalanceListener);
        this.metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), Collections.emptySet(), this.time.milliseconds());
        Assert.assertTrue(this.subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME));
    }

    @Test
    public void testRejoinGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertTrue(this.rebalanceListener.revoked.isEmpty());
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
        this.subscriptions.subscribe(new HashSet(Arrays.asList(this.topic1, "otherTopic")), this.rebalanceListener);
        this.client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertEquals(2L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.revoked);
        Assert.assertEquals(2L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test
    public void testDisconnectInJoin() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse) joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.coordinator.joinGroupIfNeeded();
        Assert.assertFalse(this.coordinator.needRejoin());
        Assert.assertEquals(Collections.singleton(this.t1p), this.subscriptions.assignedPartitions());
        Assert.assertEquals(1L, this.rebalanceListener.revokedCount);
        Assert.assertEquals(1L, this.rebalanceListener.assignedCount);
        Assert.assertEquals(Collections.singleton(this.t1p), this.rebalanceListener.assigned);
    }

    @Test(expected = ApiException.class)
    public void testInvalidSessionTimeout() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(0, "consumer", "", Errors.INVALID_SESSION_TIMEOUT.code()));
        this.coordinator.joinGroupIfNeeded();
    }

    @Test
    public void testCommitOffsetOnly() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), callback(atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitDynamicAssignment() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        buildCoordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        buildCoordinator.joinGroupIfNeeded();
        this.subscriptions.seek(this.t1p, 100L);
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        this.time.sleep(this.autoCommitIntervalMs);
        buildCoordinator.poll(this.time.milliseconds());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitDynamicAssignmentRebalance() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        buildCoordinator.ensureCoordinatorReady();
        this.time.sleep(this.autoCommitIntervalMs);
        this.consumerClient.poll(0L);
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        buildCoordinator.joinGroupIfNeeded();
        this.subscriptions.seek(this.t1p, 100L);
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        this.time.sleep(this.autoCommitIntervalMs);
        buildCoordinator.poll(this.time.milliseconds());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitManualAssignment() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        buildCoordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        this.time.sleep(this.autoCommitIntervalMs);
        buildCoordinator.poll(this.time.milliseconds());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testAutoCommitManualAssignmentCoordinatorUnknown() {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, true, true);
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.seek(this.t1p, 100L);
        this.consumerClient.poll(0L);
        this.time.sleep(this.autoCommitIntervalMs);
        this.consumerClient.poll(0L);
        Assert.assertNull(this.subscriptions.committed(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        buildCoordinator.ensureCoordinatorReady();
        this.time.sleep(this.retryBackoffMs);
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        buildCoordinator.poll(this.time.milliseconds());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testCommitOffsetMetadata() {
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "hello")), callback(atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
        Assert.assertEquals("hello", this.subscriptions.committed(this.t1p).metadata());
    }

    @Test
    public void testCommitOffsetAsyncWithDefaultCallback() {
        int i = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals(i + 1, this.mockOffsetCommitCallback.invoked);
        Assert.assertNull(this.mockOffsetCommitCallback.exception);
    }

    @Test
    public void testCommitAfterLeaveGroup() {
        this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
        this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
        this.client.prepareMetadataUpdate(this.cluster, Collections.emptySet());
        this.coordinator.joinGroupIfNeeded();
        this.client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()));
        this.subscriptions.unsubscribe();
        this.coordinator.maybeLeaveGroup();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.13
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                OffsetCommitRequest offsetCommitRequest = (OffsetCommitRequest) abstractRequest;
                return offsetCommitRequest.memberId().equals("") && offsetCommitRequest.generationId() == -1;
            }
        }, (AbstractResponse) offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), callback(atomicBoolean));
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testCommitOffsetAsyncFailedWithDefaultCallback() {
        int i = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals(i + 1, this.mockOffsetCommitCallback.invoked);
        Assert.assertTrue(this.mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertEquals(1L, mockCommitCallback.invoked);
        Assert.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NOT_COORDINATOR_FOR_GROUP.code()))));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertEquals(1L, mockCommitCallback.invoked);
        Assert.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetAsyncDisconnected() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        MockCommitCallback mockCommitCallback = new MockCommitCallback();
        this.client.prepareResponse((AbstractResponse) offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))), true);
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), mockCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertTrue(this.coordinator.coordinatorUnknown());
        Assert.assertEquals(1L, mockCommitCallback.invoked);
        Assert.assertTrue(mockCommitCallback.exception instanceof RetriableCommitFailedException);
    }

    @Test
    public void testCommitOffsetSyncNotCoordinator() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NOT_COORDINATOR_FOR_GROUP.code()))));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test
    public void testCommitOffsetSyncCoordinatorNotAvailable() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.GROUP_COORDINATOR_NOT_AVAILABLE.code()))));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test
    public void testCommitOffsetSyncCoordinatorDisconnected() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse) offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))), true);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.NONE.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test(expected = KafkaException.class)
    public void testCommitUnknownTopicOrPartition() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.UNKNOWN_TOPIC_OR_PARTITION.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected = OffsetMetadataTooLarge.class)
    public void testCommitOffsetMetadataTooLarge() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.OFFSET_METADATA_TOO_LARGE.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected = CommitFailedException.class)
    public void testCommitOffsetIllegalGeneration() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.ILLEGAL_GENERATION.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected = CommitFailedException.class)
    public void testCommitOffsetUnknownMemberId() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.UNKNOWN_MEMBER_ID.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected = CommitFailedException.class)
    public void testCommitOffsetRebalanceInProgress() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse(offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.REBALANCE_IN_PROGRESS.code()))));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
    }

    @Test(expected = KafkaException.class)
    public void testCommitOffsetSyncCallbackWithNonRetriableException() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.client.prepareResponse((AbstractResponse) offsetCommitResponse(Collections.singletonMap(this.t1p, Short.valueOf(Errors.UNKNOWN.code()))), false);
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
    }

    @Test(expected = IllegalArgumentException.class)
    public void testCommitSyncNegativeOffset() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.commitOffsetsSync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(-1L)), Long.MAX_VALUE);
    }

    @Test
    public void testCommitAsyncNegativeOffset() {
        int i = this.mockOffsetCommitCallback.invoked;
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.commitOffsetsAsync(Collections.singletonMap(this.t1p, new OffsetAndMetadata(-1L)), this.mockOffsetCommitCallback);
        this.coordinator.invokeCompletedOffsetCommitCallbacks();
        Assert.assertEquals(i + 1, this.mockOffsetCommitCallback.invoked);
        Assert.assertTrue(this.mockOffsetCommitCallback.exception instanceof IllegalArgumentException);
    }

    @Test
    public void testRefreshOffset() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testRefreshOffsetLoadInProgress() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse(offsetFetchResponse(Errors.GROUP_LOAD_IN_PROGRESS));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testRefreshOffsetsGroupNotAuthorized() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse(offsetFetchResponse(Errors.GROUP_AUTHORIZATION_FAILED));
        try {
            this.coordinator.refreshCommittedOffsetsIfNeeded();
            Assert.fail("Expected group authorization error");
        } catch (GroupAuthorizationException e) {
            Assert.assertEquals(this.groupId, e.groupId());
        }
    }

    @Test(expected = KafkaException.class)
    public void testRefreshOffsetUnknownTopicOrPartition() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
    }

    @Test
    public void testRefreshOffsetNotCoordinatorForConsumer() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse(offsetFetchResponse(Errors.NOT_COORDINATOR_FOR_GROUP));
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", 100L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals(100L, this.subscriptions.committed(this.t1p).offset());
    }

    @Test
    public void testRefreshOffsetWithNoFetchableOffsets() {
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        this.coordinator.ensureCoordinatorReady();
        this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        this.subscriptions.needRefreshCommits();
        this.client.prepareResponse(offsetFetchResponse(this.t1p, Errors.NONE, "", -1L));
        this.coordinator.refreshCommittedOffsetsIfNeeded();
        Assert.assertFalse(this.subscriptions.refreshCommitsNeeded());
        Assert.assertEquals((Object) null, this.subscriptions.committed(this.t1p));
    }

    @Test
    public void testProtocolMetadataOrder() {
        Throwable th;
        PartitionAssignor roundRobinAssignor = new RoundRobinAssignor();
        PartitionAssignor rangeAssignor = new RangeAssignor();
        Metrics metrics = new Metrics(this.time);
        Throwable th2 = null;
        try {
            try {
                List metadata = buildCoordinator(metrics, Arrays.asList(roundRobinAssignor, rangeAssignor), true, false, true).metadata();
                Assert.assertEquals(2L, metadata.size());
                Assert.assertEquals(roundRobinAssignor.name(), ((JoinGroupRequest.ProtocolMetadata) metadata.get(0)).name());
                Assert.assertEquals(rangeAssignor.name(), ((JoinGroupRequest.ProtocolMetadata) metadata.get(1)).name());
                if (metrics != null) {
                    if (0 != 0) {
                        try {
                            metrics.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        metrics.close();
                    }
                }
                metrics = new Metrics(this.time);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    List metadata2 = buildCoordinator(metrics, Arrays.asList(rangeAssignor, roundRobinAssignor), true, false, true).metadata();
                    Assert.assertEquals(2L, metadata2.size());
                    Assert.assertEquals(rangeAssignor.name(), ((JoinGroupRequest.ProtocolMetadata) metadata2.get(0)).name());
                    Assert.assertEquals(roundRobinAssignor.name(), ((JoinGroupRequest.ProtocolMetadata) metadata2.get(1)).name());
                    if (metrics != null) {
                        if (0 == 0) {
                            metrics.close();
                            return;
                        }
                        try {
                            metrics.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                } catch (Throwable th6) {
                    th = th6;
                    throw th6;
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testCloseDynamicAssignment() throws Exception {
        gracefulCloseTest(prepareCoordinatorForCloseTest(true, true, true), true);
    }

    @Test
    public void testCloseManualAssignment() throws Exception {
        gracefulCloseTest(prepareCoordinatorForCloseTest(false, true, true), false);
    }

    @Test
    public void shouldNotLeaveGroupWhenLeaveGroupFlagIsFalse() throws Exception {
        gracefulCloseTest(prepareCoordinatorForCloseTest(true, true, false), false);
    }

    @Test
    public void testCloseCoordinatorNotKnownManualAssignment() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(false, true, true);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR_FOR_GROUP);
        this.time.sleep(this.autoCommitIntervalMs);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 60000L, 1000L, 1000L);
    }

    @Test
    public void testCloseCoordinatorNotKnownNoCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, true);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR_FOR_GROUP);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 60000L, 0L, 0L);
    }

    @Test
    public void testCloseCoordinatorNotKnownWithCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, true);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.NOT_COORDINATOR_FOR_GROUP);
        this.time.sleep(this.autoCommitIntervalMs);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 60000L, 1000L, 1000L);
    }

    @Test
    public void testCloseCoordinatorUnavailableNoCommits() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, false, true);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 60000L, 0L, 0L);
    }

    @Test
    public void testCloseTimeoutCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, true);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
        this.time.sleep(this.autoCommitIntervalMs);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 1000L, 60000L, 1000L, 1000L);
    }

    @Test
    public void testCloseMaxWaitCoordinatorUnavailableForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, true);
        makeCoordinatorUnknown(prepareCoordinatorForCloseTest, Errors.GROUP_COORDINATOR_NOT_AVAILABLE);
        this.time.sleep(this.autoCommitIntervalMs);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 60000L, 60000L, 60000L);
    }

    @Test
    public void testCloseNoResponseForCommit() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, true);
        this.time.sleep(this.autoCommitIntervalMs);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 60000L, 60000L, 60000L);
    }

    @Test
    public void testCloseNoResponseForLeaveGroup() throws Exception {
        closeVerifyTimeout(prepareCoordinatorForCloseTest(true, false, true), Long.MAX_VALUE, 60000L, 60000L, 60000L);
    }

    @Test
    public void testCloseNoWait() throws Exception {
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, true);
        this.time.sleep(this.autoCommitIntervalMs);
        closeVerifyTimeout(prepareCoordinatorForCloseTest, 0L, 60000L, 0L, 0L);
    }

    @Test
    public void testHeartbeatThreadClose() throws Exception {
        this.groupId = "testCloseTimeoutWithHeartbeatThread";
        ConsumerCoordinator prepareCoordinatorForCloseTest = prepareCoordinatorForCloseTest(true, true, true);
        prepareCoordinatorForCloseTest.ensureActiveGroup();
        this.time.sleep(this.heartbeatIntervalMs + 100);
        Thread.yield();
        closeVerifyTimeout(prepareCoordinatorForCloseTest, Long.MAX_VALUE, 60000L, 60000L, 60000L);
        Thread[] threadArr = new Thread[Thread.activeCount()];
        int enumerate = Thread.enumerate(threadArr);
        for (int i = 0; i < enumerate; i++) {
            Assert.assertFalse("Heartbeat thread active after close", threadArr[i].getName().contains(this.groupId));
        }
    }

    private ConsumerCoordinator prepareCoordinatorForCloseTest(boolean z, boolean z2, boolean z3) {
        ConsumerCoordinator buildCoordinator = buildCoordinator(new Metrics(), this.assignors, true, z2, z3);
        this.client.prepareResponse(groupCoordinatorResponse(this.node, Errors.NONE.code()));
        buildCoordinator.ensureCoordinatorReady();
        if (z) {
            this.subscriptions.subscribe(Collections.singleton(this.topic1), this.rebalanceListener);
            this.client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()));
            this.client.prepareResponse(syncGroupResponse(Collections.singletonList(this.t1p), Errors.NONE.code()));
            buildCoordinator.joinGroupIfNeeded();
        } else {
            this.subscriptions.assignFromUser(Collections.singleton(this.t1p));
        }
        this.subscriptions.seek(this.t1p, 100L);
        buildCoordinator.poll(this.time.milliseconds());
        return buildCoordinator;
    }

    private void makeCoordinatorUnknown(ConsumerCoordinator consumerCoordinator, Errors errors) {
        this.time.sleep(this.sessionTimeoutMs);
        consumerCoordinator.sendHeartbeatRequest();
        this.client.prepareResponse(heartbeatResponse(errors.code()));
        this.time.sleep(this.sessionTimeoutMs);
        this.consumerClient.poll(0L);
        Assert.assertTrue(consumerCoordinator.coordinatorUnknown());
    }

    private void closeVerifyTimeout(final ConsumerCoordinator consumerCoordinator, final long j, final long j2, long j3, long j4) throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        try {
            Future<?> submit = newSingleThreadExecutor.submit(new Runnable() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.14
                @Override // java.lang.Runnable
                public void run() {
                    consumerCoordinator.close(Math.min(j, j2));
                }
            });
            if (consumerCoordinator.coordinatorUnknown()) {
                Thread.sleep(200L);
            } else {
                this.client.waitForRequests(1, 1000L);
            }
            if (j3 > 0) {
                this.time.sleep(j3 - 1);
                try {
                    submit.get(500L, TimeUnit.MILLISECONDS);
                    Assert.fail("Close completed ungracefully without waiting for timeout");
                } catch (TimeoutException e) {
                }
            }
            if (j4 >= 0) {
                this.time.sleep((j4 - j3) + 2);
            }
            submit.get(2000L, TimeUnit.MILLISECONDS);
            newSingleThreadExecutor.shutdownNow();
        } catch (Throwable th) {
            newSingleThreadExecutor.shutdownNow();
            throw th;
        }
    }

    private void gracefulCloseTest(ConsumerCoordinator consumerCoordinator, boolean z) throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.15
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean.set(true);
                return ((OffsetCommitRequest) abstractRequest).groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse) new OffsetCommitResponse(new HashMap()));
        this.client.prepareResponse(new MockClient.RequestMatcher() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.16
            @Override // org.apache.kafka.clients.MockClient.RequestMatcher
            public boolean matches(AbstractRequest abstractRequest) {
                atomicBoolean2.set(true);
                return ((LeaveGroupRequest) abstractRequest).groupId().equals(ConsumerCoordinatorTest.this.groupId);
            }
        }, (AbstractResponse) new LeaveGroupResponse(Errors.NONE.code()));
        consumerCoordinator.close();
        Assert.assertTrue("Commit not requested", atomicBoolean.get());
        Assert.assertEquals("leaveGroupRequested should be " + z, Boolean.valueOf(z), Boolean.valueOf(atomicBoolean2.get()));
    }

    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> list, boolean z, boolean z2, boolean z3) {
        return new ConsumerCoordinator(this.consumerClient, this.groupId, this.rebalanceTimeoutMs, this.sessionTimeoutMs, this.heartbeatIntervalMs, list, this.metadata, this.subscriptions, metrics, "consumer" + this.groupId, this.time, this.retryBackoffMs, z2, this.autoCommitIntervalMs, (ConsumerInterceptors) null, z, z3);
    }

    private GroupCoordinatorResponse groupCoordinatorResponse(Node node, short s) {
        return new GroupCoordinatorResponse(s, node);
    }

    private HeartbeatResponse heartbeatResponse(short s) {
        return new HeartbeatResponse(s);
    }

    private JoinGroupResponse joinGroupLeaderResponse(int i, String str, Map<String, List<String>> map, short s) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<String, List<String>> entry : map.entrySet()) {
            hashMap.put(entry.getKey(), ConsumerProtocol.serializeSubscription(new PartitionAssignor.Subscription(entry.getValue())));
        }
        return new JoinGroupResponse(s, i, this.partitionAssignor.name(), str, str, hashMap);
    }

    private JoinGroupResponse joinGroupFollowerResponse(int i, String str, String str2, short s) {
        return new JoinGroupResponse(s, i, this.partitionAssignor.name(), str, str2, Collections.emptyMap());
    }

    private SyncGroupResponse syncGroupResponse(List<TopicPartition> list, short s) {
        return new SyncGroupResponse(s, ConsumerProtocol.serializeAssignment(new PartitionAssignor.Assignment(list)));
    }

    private OffsetCommitResponse offsetCommitResponse(Map<TopicPartition, Short> map) {
        return new OffsetCommitResponse(map);
    }

    private OffsetFetchResponse offsetFetchResponse(Errors errors) {
        return new OffsetFetchResponse(errors, Collections.emptyMap());
    }

    private OffsetFetchResponse offsetFetchResponse(TopicPartition topicPartition, Errors errors, String str, long j) {
        return new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(topicPartition, new OffsetFetchResponse.PartitionData(j, str, errors)));
    }

    private OffsetCommitCallback callback(final AtomicBoolean atomicBoolean) {
        return new OffsetCommitCallback() { // from class: org.apache.kafka.clients.consumer.internals.ConsumerCoordinatorTest.17
            public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception exc) {
                if (exc == null) {
                    atomicBoolean.set(true);
                }
            }
        };
    }
}
