/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
import org.apache.kafka.coordinator.group.MockCoordinatorTimer;
import org.apache.kafka.coordinator.group.MockPartitionAssignor;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.RecordHelpers;
import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.mockito.Mockito;

public class GroupMetadataManagerTestContext {
    final MockTime time;
    final MockCoordinatorTimer<Void, Record> timer;
    final SnapshotRegistry snapshotRegistry;
    final GroupCoordinatorMetricsShard metrics;
    final GroupMetadataManager groupMetadataManager;
    final int classicGroupInitialRebalanceDelayMs;
    final int classicGroupNewMemberJoinTimeoutMs;
    long lastCommittedOffset = 0L;
    long lastWrittenOffset = 0L;

    public static void assertNoOrEmptyResult(List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts) {
        Assertions.assertTrue((timeouts.size() <= 1 ? 1 : 0) != 0);
        timeouts.forEach(timeout -> Assertions.assertEquals((Object)GroupMetadataManager.EMPTY_RESULT, timeout.result));
    }

    public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toProtocols(String ... protocolNames) {
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(0);
        List<String> topicNames = Arrays.asList("foo", "bar", "baz");
        for (int i = 0; i < protocolNames.length; ++i) {
            protocols.add((ImplicitLinkedHashCollection.Element)new JoinGroupRequestData.JoinGroupRequestProtocol().setName(protocolNames[i]).setMetadata(ConsumerProtocol.serializeSubscription((ConsumerPartitionAssignor.Subscription)new ConsumerPartitionAssignor.Subscription(Collections.singletonList(topicNames.get(i % topicNames.size())))).array()));
        }
        return protocols;
    }

    public static Record newGroupMetadataRecord(String groupId, GroupMetadataValue value, MetadataVersion metadataVersion) {
        return new Record(new ApiMessageAndVersion((ApiMessage)new GroupMetadataKey().setGroup(groupId), 2), new ApiMessageAndVersion((ApiMessage)value, metadataVersion.groupMetadataValueVersion()));
    }

    public GroupMetadataManagerTestContext(MockTime time, MockCoordinatorTimer<Void, Record> timer, SnapshotRegistry snapshotRegistry, GroupCoordinatorMetricsShard metrics, GroupMetadataManager groupMetadataManager, int classicGroupInitialRebalanceDelayMs, int classicGroupNewMemberJoinTimeoutMs) {
        this.time = time;
        this.timer = timer;
        this.snapshotRegistry = snapshotRegistry;
        this.metrics = metrics;
        this.groupMetadataManager = groupMetadataManager;
        this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs;
        this.classicGroupNewMemberJoinTimeoutMs = classicGroupNewMemberJoinTimeoutMs;
        snapshotRegistry.getOrCreateSnapshot(this.lastWrittenOffset);
    }

    public void commit() {
        long lastCommittedOffset = this.lastCommittedOffset;
        this.lastCommittedOffset = this.lastWrittenOffset;
        this.snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
    }

    public void rollback() {
        this.lastWrittenOffset = this.lastCommittedOffset;
        this.snapshotRegistry.revertToSnapshot(this.lastCommittedOffset);
    }

    public ConsumerGroup.ConsumerGroupState consumerGroupState(String groupId) {
        return this.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false).state();
    }

    public ConsumerGroupMember.MemberState consumerGroupMemberState(String groupId, String memberId) {
        return this.groupMetadataManager.getOrMaybeCreateConsumerGroup(groupId, false).getOrMaybeCreateMember(memberId, false).state();
    }

    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, Record> consumerGroupHeartbeat(ConsumerGroupHeartbeatRequestData request) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        CoordinatorResult result = this.groupMetadataManager.consumerGroupHeartbeat(context, request);
        result.records().forEach(this::replay);
        return result;
    }

    public List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> sleep(long ms) {
        this.time.sleep(ms);
        List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts = this.timer.poll();
        timeouts.forEach(timeout -> {
            if (timeout.result.replayRecords()) {
                timeout.result.records().forEach(this::replay);
            }
        });
        return timeouts;
    }

    public void assertSessionTimeout(String groupId, String memberId, long delayMs) {
        MockCoordinatorTimer.ScheduledTimeout<Void, Record> timeout = this.timer.timeout(GroupMetadataManager.consumerGroupSessionTimeoutKey((String)groupId, (String)memberId));
        Assertions.assertNotNull(timeout);
        Assertions.assertEquals((long)(this.time.milliseconds() + delayMs), (long)timeout.deadlineMs);
    }

    public void assertNoSessionTimeout(String groupId, String memberId) {
        MockCoordinatorTimer.ScheduledTimeout<Void, Record> timeout = this.timer.timeout(GroupMetadataManager.consumerGroupSessionTimeoutKey((String)groupId, (String)memberId));
        Assertions.assertNull(timeout);
    }

    public MockCoordinatorTimer.ScheduledTimeout<Void, Record> assertRevocationTimeout(String groupId, String memberId, long delayMs) {
        MockCoordinatorTimer.ScheduledTimeout<Void, Record> timeout = this.timer.timeout(GroupMetadataManager.consumerGroupRevocationTimeoutKey((String)groupId, (String)memberId));
        Assertions.assertNotNull(timeout);
        Assertions.assertEquals((long)(this.time.milliseconds() + delayMs), (long)timeout.deadlineMs);
        return timeout;
    }

    public void assertNoRevocationTimeout(String groupId, String memberId) {
        MockCoordinatorTimer.ScheduledTimeout<Void, Record> timeout = this.timer.timeout(GroupMetadataManager.consumerGroupRevocationTimeoutKey((String)groupId, (String)memberId));
        Assertions.assertNull(timeout);
    }

    ClassicGroup createClassicGroup(String groupId) {
        return this.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, true);
    }

    public JoinResult sendClassicGroupJoin(JoinGroupRequestData request) {
        return this.sendClassicGroupJoin(request, false);
    }

    public JoinResult sendClassicGroupJoin(JoinGroupRequestData request, boolean requireKnownMemberId) {
        return this.sendClassicGroupJoin(request, requireKnownMemberId, false);
    }

    public JoinResult sendClassicGroupJoin(JoinGroupRequestData request, boolean requireKnownMemberId, boolean supportSkippingAssignment) {
        short joinGroupVersion = 3;
        if (requireKnownMemberId) {
            joinGroupVersion = 4;
            if (supportSkippingAssignment) {
                joinGroupVersion = ApiKeys.JOIN_GROUP.latestVersion();
            }
        }
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.JOIN_GROUP, joinGroupVersion, "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        CompletableFuture<JoinGroupResponseData> responseFuture = new CompletableFuture<JoinGroupResponseData>();
        CoordinatorResult coordinatorResult = this.groupMetadataManager.classicGroupJoin(context, request, responseFuture);
        return new JoinResult(responseFuture, (CoordinatorResult<Void, Record>)coordinatorResult);
    }

    public JoinGroupResponseData joinClassicGroupAsDynamicMemberAndCompleteRebalance(String groupId) throws Exception {
        ClassicGroup group = this.createClassicGroup(groupId);
        JoinGroupResponseData leaderJoinResponse = this.joinClassicGroupAsDynamicMemberAndCompleteJoin(new JoinGroupRequestBuilder().withGroupId("group-id").withMemberId("").withDefaultProtocolTypeAndProtocols().withRebalanceTimeoutMs(10000).withSessionTimeoutMs(5000).build());
        Assertions.assertEquals((int)1, (int)leaderJoinResponse.generationId());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        SyncResult syncResult = this.sendClassicGroupSync(new SyncGroupRequestBuilder().withGroupId("group-id").withMemberId(leaderJoinResponse.memberId()).withGenerationId(leaderJoinResponse.generationId()).build());
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newGroupMetadataRecord((ClassicGroup)group, (Map)group.groupAssignment(), (MetadataVersion)MetadataVersion.latestTesting())), syncResult.records);
        syncResult.appendFuture.complete(null);
        Assertions.assertTrue((boolean)syncResult.syncFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)syncResult.syncFuture.get().errorCode());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        return leaderJoinResponse;
    }

    public JoinGroupResponseData joinClassicGroupAsDynamicMemberAndCompleteJoin(JoinGroupRequestData request) throws ExecutionException, InterruptedException {
        boolean requireKnownMemberId = true;
        String newMemberId = request.memberId();
        if (request.memberId().equals("")) {
            JoinResult firstJoinResult = this.sendClassicGroupJoin(request, requireKnownMemberId);
            Assertions.assertTrue((boolean)firstJoinResult.records.isEmpty());
            Assertions.assertTrue((boolean)firstJoinResult.joinFuture.isDone());
            Assertions.assertEquals((short)Errors.MEMBER_ID_REQUIRED.code(), (short)firstJoinResult.joinFuture.get().errorCode());
            newMemberId = firstJoinResult.joinFuture.get().memberId();
        }
        JoinGroupRequestData secondRequest = new JoinGroupRequestData().setGroupId(request.groupId()).setMemberId(newMemberId).setProtocolType(request.protocolType()).setProtocols(request.protocols()).setSessionTimeoutMs(request.sessionTimeoutMs()).setRebalanceTimeoutMs(request.rebalanceTimeoutMs()).setReason(request.reason());
        JoinResult secondJoinResult = this.sendClassicGroupJoin(secondRequest, requireKnownMemberId);
        Assertions.assertTrue((boolean)secondJoinResult.records.isEmpty());
        List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts = this.sleep(this.classicGroupInitialRebalanceDelayMs);
        Assertions.assertEquals((int)1, (int)timeouts.size());
        Assertions.assertTrue((boolean)secondJoinResult.joinFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)secondJoinResult.joinFuture.get().errorCode());
        return secondJoinResult.joinFuture.get();
    }

    public JoinGroupResponseData joinClassicGroupAndCompleteJoin(JoinGroupRequestData request, boolean requireKnownMemberId, boolean supportSkippingAssignment) throws ExecutionException, InterruptedException {
        return this.joinClassicGroupAndCompleteJoin(request, requireKnownMemberId, supportSkippingAssignment, this.classicGroupInitialRebalanceDelayMs);
    }

    public JoinGroupResponseData joinClassicGroupAndCompleteJoin(JoinGroupRequestData request, boolean requireKnownMemberId, boolean supportSkippingAssignment, int advanceClockMs) throws ExecutionException, InterruptedException {
        if (requireKnownMemberId && request.groupInstanceId().isEmpty()) {
            return this.joinClassicGroupAsDynamicMemberAndCompleteJoin(request);
        }
        try {
            JoinResult joinResult = this.sendClassicGroupJoin(request, requireKnownMemberId, supportSkippingAssignment);
            this.sleep(advanceClockMs);
            Assertions.assertTrue((boolean)joinResult.joinFuture.isDone());
            Assertions.assertEquals((short)Errors.NONE.code(), (short)joinResult.joinFuture.get().errorCode());
            return joinResult.joinFuture.get();
        }
        catch (Exception e) {
            Assertions.fail((String)("Failed to due: " + e.getMessage()));
            return null;
        }
    }

    public SyncResult sendClassicGroupSync(SyncGroupRequestData request) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.SYNC_GROUP, ApiKeys.SYNC_GROUP.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        CompletableFuture<SyncGroupResponseData> responseFuture = new CompletableFuture<SyncGroupResponseData>();
        CoordinatorResult coordinatorResult = this.groupMetadataManager.classicGroupSync(context, request, responseFuture);
        return new SyncResult(responseFuture, (CoordinatorResult<Void, Record>)coordinatorResult);
    }

    public RebalanceResult staticMembersJoinAndRebalance(String groupId, String leaderInstanceId, String followerInstanceId) throws Exception {
        return this.staticMembersJoinAndRebalance(groupId, leaderInstanceId, followerInstanceId, 10000, 5000);
    }

    public RebalanceResult staticMembersJoinAndRebalance(String groupId, String leaderInstanceId, String followerInstanceId, int rebalanceTimeoutMs, int sessionTimeoutMs) throws Exception {
        ClassicGroup group = this.createClassicGroup("group-id");
        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder().withGroupId(groupId).withGroupInstanceId(leaderInstanceId).withMemberId("").withProtocolType("consumer").withProtocolSuperset().withRebalanceTimeoutMs(rebalanceTimeoutMs).withSessionTimeoutMs(sessionTimeoutMs).build();
        JoinResult leaderJoinResult = this.sendClassicGroupJoin(joinRequest);
        JoinResult followerJoinResult = this.sendClassicGroupJoin(joinRequest.setGroupInstanceId(followerInstanceId));
        Assertions.assertTrue((boolean)leaderJoinResult.records.isEmpty());
        Assertions.assertTrue((boolean)followerJoinResult.records.isEmpty());
        Assertions.assertFalse((boolean)leaderJoinResult.joinFuture.isDone());
        Assertions.assertFalse((boolean)followerJoinResult.joinFuture.isDone());
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(this.classicGroupInitialRebalanceDelayMs));
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(this.classicGroupInitialRebalanceDelayMs));
        Assertions.assertTrue((boolean)leaderJoinResult.joinFuture.isDone());
        Assertions.assertTrue((boolean)followerJoinResult.joinFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)leaderJoinResult.joinFuture.get().errorCode());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)followerJoinResult.joinFuture.get().errorCode());
        Assertions.assertEquals((int)1, (int)leaderJoinResult.joinFuture.get().generationId());
        Assertions.assertEquals((int)1, (int)followerJoinResult.joinFuture.get().generationId());
        Assertions.assertEquals((int)2, (int)group.size());
        Assertions.assertEquals((int)1, (int)group.generationId());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        String leaderId = leaderJoinResult.joinFuture.get().memberId();
        String followerId = followerJoinResult.joinFuture.get().memberId();
        ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment> assignment = new ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment>();
        assignment.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(leaderId).setAssignment(new byte[]{1}));
        assignment.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(followerId).setAssignment(new byte[]{2}));
        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder().withGroupId(groupId).withGroupInstanceId(leaderInstanceId).withMemberId(leaderId).withGenerationId(1).withAssignment(assignment).build();
        SyncResult leaderSyncResult = this.sendClassicGroupSync(syncRequest);
        Map<String, byte[]> groupAssignment = assignment.stream().collect(Collectors.toMap(SyncGroupRequestData.SyncGroupRequestAssignment::memberId, SyncGroupRequestData.SyncGroupRequestAssignment::assignment));
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newGroupMetadataRecord((ClassicGroup)group, groupAssignment, (MetadataVersion)MetadataVersion.latestTesting())), leaderSyncResult.records);
        leaderSyncResult.appendFuture.complete(null);
        Assertions.assertTrue((boolean)leaderSyncResult.syncFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)leaderSyncResult.syncFuture.get().errorCode());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        SyncResult followerSyncResult = this.sendClassicGroupSync(syncRequest.setGroupInstanceId(followerInstanceId).setMemberId(followerId).setAssignments(Collections.emptyList()));
        Assertions.assertTrue((boolean)followerSyncResult.records.isEmpty());
        Assertions.assertTrue((boolean)followerSyncResult.syncFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)followerSyncResult.syncFuture.get().errorCode());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        Assertions.assertEquals((int)2, (int)group.size());
        Assertions.assertEquals((int)1, (int)group.generationId());
        return new RebalanceResult(1, leaderId, leaderSyncResult.syncFuture.get().assignment(), followerId, followerSyncResult.syncFuture.get().assignment());
    }

    public PendingMemberGroupResult setupGroupWithPendingMember(ClassicGroup group) throws Exception {
        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder().withGroupId("group-id").withMemberId("").withDefaultProtocolTypeAndProtocols().withRebalanceTimeoutMs(10000).withSessionTimeoutMs(5000).build();
        JoinGroupResponseData leaderJoinResponse = this.joinClassicGroupAsDynamicMemberAndCompleteJoin(joinRequest);
        ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment> assignment = new ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment>();
        assignment.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(leaderJoinResponse.memberId()));
        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder().withGroupId("group-id").withMemberId(leaderJoinResponse.memberId()).withGenerationId(leaderJoinResponse.generationId()).withAssignment(assignment).build();
        SyncResult syncResult = this.sendClassicGroupSync(syncRequest);
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newGroupMetadataRecord((ClassicGroup)group, (Map)group.groupAssignment(), (MetadataVersion)MetadataVersion.latestTesting())), syncResult.records);
        syncResult.appendFuture.complete(null);
        Assertions.assertTrue((boolean)syncResult.syncFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)syncResult.syncFuture.get().errorCode());
        JoinResult followerJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(""));
        Assertions.assertTrue((boolean)followerJoinResult.records.isEmpty());
        Assertions.assertFalse((boolean)followerJoinResult.joinFuture.isDone());
        JoinResult leaderJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(leaderJoinResponse.memberId()));
        Assertions.assertTrue((boolean)leaderJoinResult.records.isEmpty());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        Assertions.assertTrue((boolean)leaderJoinResult.joinFuture.isDone());
        Assertions.assertTrue((boolean)followerJoinResult.joinFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)leaderJoinResult.joinFuture.get().errorCode());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)followerJoinResult.joinFuture.get().errorCode());
        Assertions.assertEquals((int)leaderJoinResult.joinFuture.get().generationId(), (int)followerJoinResult.joinFuture.get().generationId());
        Assertions.assertEquals((Object)leaderJoinResponse.memberId(), (Object)leaderJoinResult.joinFuture.get().leader());
        Assertions.assertEquals((Object)leaderJoinResponse.memberId(), (Object)followerJoinResult.joinFuture.get().leader());
        int nextGenerationId = leaderJoinResult.joinFuture.get().generationId();
        String followerId = followerJoinResult.joinFuture.get().memberId();
        syncResult = this.sendClassicGroupSync(syncRequest.setGenerationId(nextGenerationId));
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newGroupMetadataRecord((ClassicGroup)group, (Map)group.groupAssignment(), (MetadataVersion)MetadataVersion.latestTesting())), syncResult.records);
        syncResult.appendFuture.complete(null);
        Assertions.assertTrue((boolean)syncResult.syncFuture.isDone());
        Assertions.assertEquals((short)Errors.NONE.code(), (short)syncResult.syncFuture.get().errorCode());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        leaderJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(leaderJoinResponse.memberId()));
        Assertions.assertTrue((boolean)leaderJoinResult.records.isEmpty());
        Assertions.assertFalse((boolean)leaderJoinResult.joinFuture.isDone());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.PREPARING_REBALANCE));
        JoinResult pendingMemberJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId("").setSessionTimeoutMs(2500), true);
        Assertions.assertTrue((boolean)pendingMemberJoinResult.records.isEmpty());
        Assertions.assertTrue((boolean)pendingMemberJoinResult.joinFuture.isDone());
        Assertions.assertEquals((short)Errors.MEMBER_ID_REQUIRED.code(), (short)pendingMemberJoinResult.joinFuture.get().errorCode());
        Assertions.assertEquals((int)1, (int)group.numPendingJoinMembers());
        followerJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(followerId).setSessionTimeoutMs(5000));
        Assertions.assertTrue((boolean)followerJoinResult.records.isEmpty());
        Assertions.assertFalse((boolean)followerJoinResult.joinFuture.isDone());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.PREPARING_REBALANCE));
        Assertions.assertEquals((int)2, (int)group.size());
        Assertions.assertEquals((int)1, (int)group.numPendingJoinMembers());
        return new PendingMemberGroupResult(leaderJoinResponse.memberId(), followerId, pendingMemberJoinResult.joinFuture.get());
    }

    public void verifySessionExpiration(ClassicGroup group, int timeoutMs) {
        Set expectedHeartbeatKeys = group.allMembers().stream().map(member -> GroupMetadataManager.classicGroupHeartbeatKey((String)group.groupId(), (String)member.memberId())).collect(Collectors.toSet());
        List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> timeouts = this.sleep(timeoutMs);
        List<Record> expectedRecords = Collections.singletonList(GroupMetadataManagerTestContext.newGroupMetadataRecord(group.groupId(), new GroupMetadataValue().setMembers(Collections.emptyList()).setGeneration(group.generationId()).setLeader(null).setProtocolType("consumer").setProtocol(null).setCurrentStateTimestamp(this.time.milliseconds()), MetadataVersion.latestTesting()));
        Set heartbeatKeys = timeouts.stream().map(timeout -> timeout.key).collect(Collectors.toSet());
        Assertions.assertEquals(expectedHeartbeatKeys, heartbeatKeys);
        int timeoutsSize = timeouts.size();
        Assertions.assertEquals(expectedRecords, (Object)timeouts.get((int)(timeoutsSize - 1)).result.records());
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(timeouts.subList(0, timeoutsSize - 1));
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.EMPTY));
        Assertions.assertEquals((int)0, (int)group.size());
    }

    public HeartbeatResponseData sendClassicGroupHeartbeat(HeartbeatRequestData request) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        return this.groupMetadataManager.classicGroupHeartbeat(context, request);
    }

    public List<ListGroupsResponseData.ListedGroup> sendListGroups(List<String> statesFilter, List<String> typesFilter) {
        HashSet<String> statesFilterSet = new HashSet<String>(statesFilter);
        HashSet<String> typesFilterSet = new HashSet<String>(typesFilter);
        return this.groupMetadataManager.listGroups(statesFilterSet, typesFilterSet, this.lastCommittedOffset);
    }

    public List<ConsumerGroupDescribeResponseData.DescribedGroup> sendConsumerGroupDescribe(List<String> groupIds) {
        return this.groupMetadataManager.consumerGroupDescribe(groupIds, this.lastCommittedOffset);
    }

    public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds) {
        return this.groupMetadataManager.describeGroups(groupIds, this.lastCommittedOffset);
    }

    public void verifyHeartbeat(String groupId, JoinGroupResponseData joinResponse, Errors expectedError) {
        HeartbeatRequestData request = new HeartbeatRequestData().setGroupId(groupId).setMemberId(joinResponse.memberId()).setGenerationId(joinResponse.generationId());
        if (expectedError == Errors.UNKNOWN_MEMBER_ID) {
            Assertions.assertThrows(UnknownMemberIdException.class, () -> this.sendClassicGroupHeartbeat(request));
        } else {
            HeartbeatResponseData response = this.sendClassicGroupHeartbeat(request);
            Assertions.assertEquals((short)expectedError.code(), (short)response.errorCode());
        }
    }

    public List<JoinGroupResponseData> joinWithNMembers(String groupId, int numMembers, int rebalanceTimeoutMs, int sessionTimeoutMs) {
        ClassicGroup group = this.createClassicGroup(groupId);
        boolean requireKnownMemberId = true;
        JoinGroupRequestData request = new JoinGroupRequestBuilder().withGroupId(groupId).withMemberId("").withDefaultProtocolTypeAndProtocols().withRebalanceTimeoutMs(rebalanceTimeoutMs).withSessionTimeoutMs(sessionTimeoutMs).build();
        List memberIds = IntStream.range(0, numMembers).mapToObj(i -> {
            JoinResult joinResult = this.sendClassicGroupJoin(request, requireKnownMemberId);
            Assertions.assertTrue((boolean)joinResult.records.isEmpty());
            Assertions.assertTrue((boolean)joinResult.joinFuture.isDone());
            try {
                return joinResult.joinFuture.get().memberId();
            }
            catch (Exception e) {
                Assertions.fail((String)("Unexpected exception: " + e.getMessage()));
                return null;
            }
        }).collect(Collectors.toList());
        List<CompletableFuture> secondJoinFutures = IntStream.range(0, numMembers).mapToObj(i -> {
            JoinResult joinResult = this.sendClassicGroupJoin(request.setMemberId((String)memberIds.get(i)), requireKnownMemberId);
            Assertions.assertTrue((boolean)joinResult.records.isEmpty());
            Assertions.assertFalse((boolean)joinResult.joinFuture.isDone());
            return joinResult.joinFuture;
        }).collect(Collectors.toList());
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(this.classicGroupInitialRebalanceDelayMs));
        secondJoinFutures.forEach(future -> Assertions.assertFalse((boolean)future.isDone()));
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(rebalanceTimeoutMs));
        List<JoinGroupResponseData> joinResponses = secondJoinFutures.stream().map(future -> {
            Assertions.assertTrue((boolean)future.isDone());
            try {
                Assertions.assertEquals((short)Errors.NONE.code(), (short)((JoinGroupResponseData)future.get()).errorCode());
                return (JoinGroupResponseData)future.get();
            }
            catch (Exception e) {
                Assertions.fail((String)("Unexpected exception: " + e.getMessage()));
                return null;
            }
        }).collect(Collectors.toList());
        Assertions.assertEquals((int)numMembers, (int)group.size());
        Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        return joinResponses;
    }

    public CoordinatorResult<LeaveGroupResponseData, Record> sendClassicGroupLeave(LeaveGroupRequestData request) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.LEAVE_GROUP, ApiKeys.LEAVE_GROUP.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        return this.groupMetadataManager.classicGroupLeave(context, request);
    }

    public void verifyDescribeGroupsReturnsDeadGroup(String groupId) {
        List<DescribeGroupsResponseData.DescribedGroup> describedGroups = this.describeGroups(Collections.singletonList(groupId));
        Assertions.assertEquals(Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id").setGroupState(ClassicGroupState.DEAD.toString())), describedGroups);
    }

    private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
        if (apiMessageAndVersion == null) {
            return null;
        }
        return apiMessageAndVersion.message();
    }

    public void replay(Record record) {
        ApiMessageAndVersion key = record.key();
        ApiMessageAndVersion value = record.value();
        if (key == null) {
            throw new IllegalStateException("Received a null key in " + record);
        }
        switch (key.version()) {
            case 2: {
                this.groupMetadataManager.replay((GroupMetadataKey)key.message(), (GroupMetadataValue)this.messageOrNull(value));
                break;
            }
            case 5: {
                this.groupMetadataManager.replay((ConsumerGroupMemberMetadataKey)key.message(), (ConsumerGroupMemberMetadataValue)this.messageOrNull(value));
                break;
            }
            case 3: {
                this.groupMetadataManager.replay((ConsumerGroupMetadataKey)key.message(), (ConsumerGroupMetadataValue)this.messageOrNull(value));
                break;
            }
            case 4: {
                this.groupMetadataManager.replay((ConsumerGroupPartitionMetadataKey)key.message(), (ConsumerGroupPartitionMetadataValue)this.messageOrNull(value));
                break;
            }
            case 7: {
                this.groupMetadataManager.replay((ConsumerGroupTargetAssignmentMemberKey)key.message(), (ConsumerGroupTargetAssignmentMemberValue)this.messageOrNull(value));
                break;
            }
            case 6: {
                this.groupMetadataManager.replay((ConsumerGroupTargetAssignmentMetadataKey)key.message(), (ConsumerGroupTargetAssignmentMetadataValue)this.messageOrNull(value));
                break;
            }
            case 8: {
                this.groupMetadataManager.replay((ConsumerGroupCurrentMemberAssignmentKey)key.message(), (ConsumerGroupCurrentMemberAssignmentValue)this.messageOrNull(value));
                break;
            }
            default: {
                throw new IllegalStateException("Received an unknown record type " + key.version() + " in " + record);
            }
        }
        ++this.lastWrittenOffset;
        this.snapshotRegistry.getOrCreateSnapshot(this.lastWrittenOffset);
    }

    public static class Builder {
        private final MockTime time = new MockTime();
        private final MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer((Time)this.time);
        private final LogContext logContext = new LogContext();
        private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
        private MetadataImage metadataImage;
        private List<PartitionAssignor> consumerGroupAssignors = Collections.singletonList(new MockPartitionAssignor("range"));
        private final List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<ConsumerGroupBuilder>();
        private int consumerGroupMaxSize = Integer.MAX_VALUE;
        private int consumerGroupMetadataRefreshIntervalMs = Integer.MAX_VALUE;
        private int classicGroupMaxSize = Integer.MAX_VALUE;
        private int classicGroupInitialRebalanceDelayMs = 3000;
        private final int classicGroupNewMemberJoinTimeoutMs = 300000;
        private int classicGroupMinSessionTimeoutMs = 10;
        private int classicGroupMaxSessionTimeoutMs = 600000;
        private final GroupCoordinatorMetricsShard metrics = (GroupCoordinatorMetricsShard)Mockito.mock(GroupCoordinatorMetricsShard.class);

        public Builder withMetadataImage(MetadataImage metadataImage) {
            this.metadataImage = metadataImage;
            return this;
        }

        public Builder withAssignors(List<PartitionAssignor> assignors) {
            this.consumerGroupAssignors = assignors;
            return this;
        }

        public Builder withConsumerGroup(ConsumerGroupBuilder builder) {
            this.consumerGroupBuilders.add(builder);
            return this;
        }

        public Builder withConsumerGroupMaxSize(int consumerGroupMaxSize) {
            this.consumerGroupMaxSize = consumerGroupMaxSize;
            return this;
        }

        public Builder withConsumerGroupMetadataRefreshIntervalMs(int consumerGroupMetadataRefreshIntervalMs) {
            this.consumerGroupMetadataRefreshIntervalMs = consumerGroupMetadataRefreshIntervalMs;
            return this;
        }

        public Builder withClassicGroupMaxSize(int classicGroupMaxSize) {
            this.classicGroupMaxSize = classicGroupMaxSize;
            return this;
        }

        public Builder withClassicGroupInitialRebalanceDelayMs(int classicGroupInitialRebalanceDelayMs) {
            this.classicGroupInitialRebalanceDelayMs = classicGroupInitialRebalanceDelayMs;
            return this;
        }

        public Builder withClassicGroupMinSessionTimeoutMs(int classicGroupMinSessionTimeoutMs) {
            this.classicGroupMinSessionTimeoutMs = classicGroupMinSessionTimeoutMs;
            return this;
        }

        public Builder withClassicGroupMaxSessionTimeoutMs(int classicGroupMaxSessionTimeoutMs) {
            this.classicGroupMaxSessionTimeoutMs = classicGroupMaxSessionTimeoutMs;
            return this;
        }

        public GroupMetadataManagerTestContext build() {
            if (this.metadataImage == null) {
                this.metadataImage = MetadataImage.EMPTY;
            }
            if (this.consumerGroupAssignors == null) {
                this.consumerGroupAssignors = Collections.emptyList();
            }
            GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(this.time, this.timer, this.snapshotRegistry, this.metrics, new GroupMetadataManager.Builder().withSnapshotRegistry(this.snapshotRegistry).withLogContext(this.logContext).withTime((Time)this.time).withTimer(this.timer).withMetadataImage(this.metadataImage).withConsumerGroupHeartbeatInterval(5000).withConsumerGroupSessionTimeout(45000).withConsumerGroupMaxSize(this.consumerGroupMaxSize).withConsumerGroupAssignors(this.consumerGroupAssignors).withConsumerGroupMetadataRefreshIntervalMs(this.consumerGroupMetadataRefreshIntervalMs).withClassicGroupMaxSize(this.classicGroupMaxSize).withClassicGroupMinSessionTimeoutMs(this.classicGroupMinSessionTimeoutMs).withClassicGroupMaxSessionTimeoutMs(this.classicGroupMaxSessionTimeoutMs).withClassicGroupInitialRebalanceDelayMs(this.classicGroupInitialRebalanceDelayMs).withClassicGroupNewMemberJoinTimeoutMs(300000).withGroupCoordinatorMetricsShard(this.metrics).build(), this.classicGroupInitialRebalanceDelayMs, 300000);
            this.consumerGroupBuilders.forEach(builder -> builder.build(this.metadataImage.topics()).forEach(context::replay));
            context.commit();
            return context;
        }
    }

    public static class SyncGroupRequestBuilder {
        String groupId = null;
        String groupInstanceId = null;
        String memberId = null;
        String protocolType = "consumer";
        String protocolName = "range";
        int generationId = 0;
        List<SyncGroupRequestData.SyncGroupRequestAssignment> assignment = Collections.emptyList();

        SyncGroupRequestBuilder withGroupId(String groupId) {
            this.groupId = groupId;
            return this;
        }

        SyncGroupRequestBuilder withGroupInstanceId(String groupInstanceId) {
            this.groupInstanceId = groupInstanceId;
            return this;
        }

        SyncGroupRequestBuilder withMemberId(String memberId) {
            this.memberId = memberId;
            return this;
        }

        SyncGroupRequestBuilder withGenerationId(int generationId) {
            this.generationId = generationId;
            return this;
        }

        SyncGroupRequestBuilder withProtocolType(String protocolType) {
            this.protocolType = protocolType;
            return this;
        }

        SyncGroupRequestBuilder withProtocolName(String protocolName) {
            this.protocolName = protocolName;
            return this;
        }

        SyncGroupRequestBuilder withAssignment(List<SyncGroupRequestData.SyncGroupRequestAssignment> assignment) {
            this.assignment = assignment;
            return this;
        }

        SyncGroupRequestData build() {
            return new SyncGroupRequestData().setGroupId(this.groupId).setGroupInstanceId(this.groupInstanceId).setMemberId(this.memberId).setGenerationId(this.generationId).setProtocolType(this.protocolType).setProtocolName(this.protocolName).setAssignments(this.assignment);
        }
    }

    public static class JoinGroupRequestBuilder {
        String groupId = null;
        String groupInstanceId = null;
        String memberId = null;
        String protocolType = "consumer";
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(0);
        int sessionTimeoutMs = 500;
        int rebalanceTimeoutMs = 500;
        String reason = null;

        JoinGroupRequestBuilder withGroupId(String groupId) {
            this.groupId = groupId;
            return this;
        }

        JoinGroupRequestBuilder withGroupInstanceId(String groupInstanceId) {
            this.groupInstanceId = groupInstanceId;
            return this;
        }

        JoinGroupRequestBuilder withMemberId(String memberId) {
            this.memberId = memberId;
            return this;
        }

        JoinGroupRequestBuilder withDefaultProtocolTypeAndProtocols() {
            this.protocols = GroupMetadataManagerTestContext.toProtocols("range");
            return this;
        }

        JoinGroupRequestBuilder withProtocolSuperset() {
            this.protocols = GroupMetadataManagerTestContext.toProtocols("range", "roundrobin");
            return this;
        }

        JoinGroupRequestBuilder withProtocolType(String protocolType) {
            this.protocolType = protocolType;
            return this;
        }

        JoinGroupRequestBuilder withProtocols(JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols) {
            this.protocols = protocols;
            return this;
        }

        JoinGroupRequestBuilder withRebalanceTimeoutMs(int rebalanceTimeoutMs) {
            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
            return this;
        }

        JoinGroupRequestBuilder withSessionTimeoutMs(int sessionTimeoutMs) {
            this.sessionTimeoutMs = sessionTimeoutMs;
            return this;
        }

        JoinGroupRequestBuilder withReason(String reason) {
            this.reason = reason;
            return this;
        }

        JoinGroupRequestData build() {
            return new JoinGroupRequestData().setGroupId(this.groupId).setGroupInstanceId(this.groupInstanceId).setMemberId(this.memberId).setProtocolType(this.protocolType).setProtocols(this.protocols).setRebalanceTimeoutMs(this.rebalanceTimeoutMs).setSessionTimeoutMs(this.sessionTimeoutMs).setReason(this.reason);
        }
    }

    public static class SyncResult {
        CompletableFuture<SyncGroupResponseData> syncFuture;
        List<Record> records;
        CompletableFuture<Void> appendFuture;

        public SyncResult(CompletableFuture<SyncGroupResponseData> syncFuture, CoordinatorResult<Void, Record> coordinatorResult) {
            this.syncFuture = syncFuture;
            this.records = coordinatorResult.records();
            this.appendFuture = coordinatorResult.appendFuture();
        }
    }

    public static class JoinResult {
        CompletableFuture<JoinGroupResponseData> joinFuture;
        List<Record> records;
        CompletableFuture<Void> appendFuture;

        public JoinResult(CompletableFuture<JoinGroupResponseData> joinFuture, CoordinatorResult<Void, Record> coordinatorResult) {
            this.joinFuture = joinFuture;
            this.records = coordinatorResult.records();
            this.appendFuture = coordinatorResult.appendFuture();
        }
    }

    public static class PendingMemberGroupResult {
        String leaderId;
        String followerId;
        JoinGroupResponseData pendingMemberResponse;

        public PendingMemberGroupResult(String leaderId, String followerId, JoinGroupResponseData pendingMemberResponse) {
            this.leaderId = leaderId;
            this.followerId = followerId;
            this.pendingMemberResponse = pendingMemberResponse;
        }
    }

    public static class RebalanceResult {
        int generationId;
        String leaderId;
        byte[] leaderAssignment;
        String followerId;
        byte[] followerAssignment;

        RebalanceResult(int generationId, String leaderId, byte[] leaderAssignment, String followerId, byte[] followerAssignment) {
            this.generationId = generationId;
            this.leaderId = leaderId;
            this.leaderAssignment = leaderAssignment;
            this.followerId = followerId;
            this.followerAssignment = followerAssignment;
        }
    }
}

