/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.ShareGroupDescribeResponseData;
import org.apache.kafka.common.message.ShareGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ShareGroupHeartbeatResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
import org.apache.kafka.coordinator.group.Assertions;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.GroupConfigManagerTest;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
import org.apache.kafka.coordinator.group.MockPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.api.assignor.ShareGroupPartitionAssignor;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupRegularExpressionValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.group.generated.GroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.GroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupCurrentMemberAssignmentValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupPartitionMetadataValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMemberValue;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataKey;
import org.apache.kafka.coordinator.group.generated.ShareGroupTargetAssignmentMetadataValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.MemberState;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupBuilder;
import org.apache.kafka.coordinator.group.modern.share.ShareGroup;
import org.apache.kafka.coordinator.group.modern.share.ShareGroupBuilder;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.mockito.Mockito;

public class GroupMetadataManagerTestContext {
    static final String DEFAULT_CLIENT_ID = "client";
    static final InetAddress DEFAULT_CLIENT_ADDRESS = InetAddress.getLoopbackAddress();
    final MockTime time;
    final MockCoordinatorTimer<Void, CoordinatorRecord> timer;
    final MockCoordinatorExecutor<CoordinatorRecord> executor;
    final SnapshotRegistry snapshotRegistry;
    final GroupCoordinatorMetricsShard metrics;
    final GroupMetadataManager groupMetadataManager;
    final GroupConfigManager groupConfigManager;
    final int classicGroupInitialRebalanceDelayMs;
    final int classicGroupNewMemberJoinTimeoutMs;
    long lastCommittedOffset = 0L;
    long lastWrittenOffset = 0L;

    public static void assertNoOrEmptyResult(List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts) {
        org.junit.jupiter.api.Assertions.assertTrue((timeouts.size() <= 1 ? 1 : 0) != 0);
        timeouts.forEach(timeout -> org.junit.jupiter.api.Assertions.assertEquals((Object)GroupMetadataManager.EMPTY_RESULT, (Object)timeout.result));
    }

    public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toProtocols(String ... protocolNames) {
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(0);
        List<String> topicNames = Arrays.asList("foo", "bar", "baz");
        for (int i = 0; i < protocolNames.length; ++i) {
            protocols.add((ImplicitLinkedHashCollection.Element)new JoinGroupRequestData.JoinGroupRequestProtocol().setName(protocolNames[i]).setMetadata(ConsumerProtocol.serializeSubscription((ConsumerPartitionAssignor.Subscription)new ConsumerPartitionAssignor.Subscription(List.of(topicNames.get(i % topicNames.size())))).array()));
        }
        return protocols;
    }

    public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toConsumerProtocol(List<String> topicNames, List<TopicPartition> ownedPartitions) {
        return GroupMetadataManagerTestContext.toConsumerProtocol(topicNames, ownedPartitions, (short)3);
    }

    public static JoinGroupRequestData.JoinGroupRequestProtocolCollection toConsumerProtocol(List<String> topicNames, List<TopicPartition> ownedPartitions, short version) {
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(0);
        protocols.add((ImplicitLinkedHashCollection.Element)new JoinGroupRequestData.JoinGroupRequestProtocol().setName("range").setMetadata(ConsumerProtocol.serializeSubscription((ConsumerPartitionAssignor.Subscription)new ConsumerPartitionAssignor.Subscription(topicNames, null, ownedPartitions), (short)version).array()));
        return protocols;
    }

    public static CoordinatorRecord newGroupMetadataRecord(String groupId, GroupMetadataValue value) {
        return CoordinatorRecord.record((ApiMessage)new GroupMetadataKey().setGroup(groupId), (ApiMessageAndVersion)new ApiMessageAndVersion((ApiMessage)value, 3));
    }

    public GroupMetadataManagerTestContext(MockTime time, MockCoordinatorTimer<Void, CoordinatorRecord> timer, MockCoordinatorExecutor<CoordinatorRecord> executor, SnapshotRegistry snapshotRegistry, GroupCoordinatorMetricsShard metrics, GroupCoordinatorConfig config, GroupMetadataManager groupMetadataManager, GroupConfigManager groupConfigManager) {
        this.time = time;
        this.timer = timer;
        this.executor = executor;
        this.snapshotRegistry = snapshotRegistry;
        this.metrics = metrics;
        this.groupMetadataManager = groupMetadataManager;
        this.groupConfigManager = groupConfigManager;
        this.classicGroupInitialRebalanceDelayMs = config.classicGroupInitialRebalanceDelayMs();
        this.classicGroupNewMemberJoinTimeoutMs = config.classicGroupNewMemberJoinTimeoutMs();
        snapshotRegistry.idempotentCreateSnapshot(this.lastWrittenOffset);
    }

    public void commit() {
        long lastCommittedOffset = this.lastCommittedOffset;
        this.lastCommittedOffset = this.lastWrittenOffset;
        this.snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
    }

    public void rollback() {
        this.lastWrittenOffset = this.lastCommittedOffset;
        this.snapshotRegistry.revertToSnapshot(this.lastCommittedOffset);
    }

    public ConsumerGroup.ConsumerGroupState consumerGroupState(String groupId) {
        return this.groupMetadataManager.consumerGroup(groupId).state();
    }

    public ShareGroup.ShareGroupState shareGroupState(String groupId) {
        return this.groupMetadataManager.shareGroup(groupId).state();
    }

    public MemberState consumerGroupMemberState(String groupId, String memberId) {
        return this.groupMetadataManager.consumerGroup(groupId).getOrMaybeCreateMember(memberId, false).state();
    }

    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(ConsumerGroupHeartbeatRequestData request) {
        return this.consumerGroupHeartbeat(request, ApiKeys.CONSUMER_GROUP_HEARTBEAT.latestVersion());
    }

    public CoordinatorResult<ConsumerGroupHeartbeatResponseData, CoordinatorRecord> consumerGroupHeartbeat(ConsumerGroupHeartbeatRequestData request, short apiVersion) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.CONSUMER_GROUP_HEARTBEAT, apiVersion, DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        CoordinatorResult result = this.groupMetadataManager.consumerGroupHeartbeat(context, request);
        if (result.replayRecords()) {
            result.records().forEach(this::replay);
        }
        return result;
    }

    public CoordinatorResult<ShareGroupHeartbeatResponseData, CoordinatorRecord> shareGroupHeartbeat(ShareGroupHeartbeatRequestData request) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.SHARE_GROUP_HEARTBEAT, ApiKeys.SHARE_GROUP_HEARTBEAT.latestVersion(), DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        CoordinatorResult result = this.groupMetadataManager.shareGroupHeartbeat(context, request);
        result.records().forEach(this::replay);
        return result;
    }

    public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) {
        this.time.sleep(ms);
        List timeouts = this.timer.poll();
        timeouts.forEach(timeout -> {
            if (timeout.result.replayRecords()) {
                timeout.result.records().forEach(this::replay);
            }
        });
        return timeouts;
    }

    public List<MockCoordinatorExecutor.ExecutorResult<CoordinatorRecord>> processTasks() {
        List results = this.executor.poll();
        results.forEach(taskResult -> {
            if (taskResult.result.replayRecords()) {
                taskResult.result.records().forEach(this::replay);
            }
        });
        return results;
    }

    public void assertSessionTimeout(String groupId, String memberId, long delayMs) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.groupSessionTimeoutKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNotNull((Object)timeout);
        org.junit.jupiter.api.Assertions.assertEquals((long)(this.time.milliseconds() + delayMs), (long)timeout.deadlineMs);
    }

    public void assertNoSessionTimeout(String groupId, String memberId) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.groupSessionTimeoutKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNull((Object)timeout);
    }

    public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> assertRebalanceTimeout(String groupId, String memberId, long delayMs) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.consumerGroupRebalanceTimeoutKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNotNull((Object)timeout);
        org.junit.jupiter.api.Assertions.assertEquals((long)(this.time.milliseconds() + delayMs), (long)timeout.deadlineMs);
        return timeout;
    }

    public void assertNoRebalanceTimeout(String groupId, String memberId) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.consumerGroupRebalanceTimeoutKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNull((Object)timeout);
    }

    public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> assertJoinTimeout(String groupId, String memberId, long delayMs) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.consumerGroupJoinKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNotNull((Object)timeout);
        org.junit.jupiter.api.Assertions.assertEquals((long)(this.time.milliseconds() + delayMs), (long)timeout.deadlineMs);
        return timeout;
    }

    public void assertNoJoinTimeout(String groupId, String memberId) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.consumerGroupJoinKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNull((Object)timeout);
    }

    public MockCoordinatorTimer.ScheduledTimeout<Void, CoordinatorRecord> assertSyncTimeout(String groupId, String memberId, long delayMs) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.consumerGroupSyncKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNotNull((Object)timeout);
        org.junit.jupiter.api.Assertions.assertEquals((long)(this.time.milliseconds() + delayMs), (long)timeout.deadlineMs);
        return timeout;
    }

    public void assertNoSyncTimeout(String groupId, String memberId) {
        MockCoordinatorTimer.ScheduledTimeout timeout = this.timer.timeout(GroupMetadataManager.consumerGroupSyncKey((String)groupId, (String)memberId));
        org.junit.jupiter.api.Assertions.assertNull((Object)timeout);
    }

    ClassicGroup createClassicGroup(String groupId) {
        return this.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, true);
    }

    public JoinResult sendClassicGroupJoin(JoinGroupRequestData request) {
        return this.sendClassicGroupJoin(request, false);
    }

    public JoinResult sendClassicGroupJoin(JoinGroupRequestData request, boolean requireKnownMemberId) {
        return this.sendClassicGroupJoin(request, requireKnownMemberId, false);
    }

    public JoinResult sendClassicGroupJoin(JoinGroupRequestData request, boolean requireKnownMemberId, boolean supportSkippingAssignment) {
        CompletableFuture<JoinGroupResponseData> responseFuture;
        RequestContext context;
        CoordinatorResult coordinatorResult;
        short joinGroupVersion = 3;
        if (requireKnownMemberId) {
            joinGroupVersion = 4;
            if (supportSkippingAssignment) {
                joinGroupVersion = ApiKeys.JOIN_GROUP.latestVersion();
            }
        }
        if ((coordinatorResult = this.groupMetadataManager.classicGroupJoin(context = new RequestContext(new RequestHeader(ApiKeys.JOIN_GROUP, joinGroupVersion, DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false), request, responseFuture = new CompletableFuture<JoinGroupResponseData>())).replayRecords()) {
            coordinatorResult.records().forEach(this::replay);
        }
        return new JoinResult(responseFuture, (CoordinatorResult<Void, CoordinatorRecord>)coordinatorResult);
    }

    public JoinGroupResponseData joinClassicGroupAsDynamicMemberAndCompleteRebalance(String groupId) throws Exception {
        ClassicGroup group = this.createClassicGroup(groupId);
        JoinGroupResponseData leaderJoinResponse = this.joinClassicGroupAsDynamicMemberAndCompleteJoin(new JoinGroupRequestBuilder().withGroupId(groupId).withMemberId("").withDefaultProtocolTypeAndProtocols().withRebalanceTimeoutMs(10000).withSessionTimeoutMs(5000).build());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)leaderJoinResponse.generationId());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        SyncResult syncResult = this.sendClassicGroupSync(new SyncGroupRequestBuilder().withGroupId(groupId).withMemberId(leaderJoinResponse.memberId()).withGenerationId(leaderJoinResponse.generationId()).build());
        org.junit.jupiter.api.Assertions.assertEquals(List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord((ClassicGroup)group, (Map)group.groupAssignment())), syncResult.records);
        syncResult.appendFuture.complete(null);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)syncResult.syncFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)syncResult.syncFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        return leaderJoinResponse;
    }

    public JoinGroupResponseData joinClassicGroupAsDynamicMemberAndCompleteJoin(JoinGroupRequestData request) throws ExecutionException, InterruptedException {
        boolean requireKnownMemberId = true;
        String newMemberId = request.memberId();
        if (request.memberId().equals("")) {
            JoinResult firstJoinResult = this.sendClassicGroupJoin(request, requireKnownMemberId);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)firstJoinResult.records.isEmpty());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)firstJoinResult.joinFuture.isDone());
            org.junit.jupiter.api.Assertions.assertEquals((short)Errors.MEMBER_ID_REQUIRED.code(), (short)firstJoinResult.joinFuture.get().errorCode());
            newMemberId = firstJoinResult.joinFuture.get().memberId();
        }
        JoinGroupRequestData secondRequest = new JoinGroupRequestData().setGroupId(request.groupId()).setMemberId(newMemberId).setProtocolType(request.protocolType()).setProtocols(request.protocols()).setSessionTimeoutMs(request.sessionTimeoutMs()).setRebalanceTimeoutMs(request.rebalanceTimeoutMs()).setReason(request.reason());
        JoinResult secondJoinResult = this.sendClassicGroupJoin(secondRequest, requireKnownMemberId);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)secondJoinResult.records.isEmpty());
        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = this.sleep(this.classicGroupInitialRebalanceDelayMs);
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)timeouts.size());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)secondJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)secondJoinResult.joinFuture.get().errorCode());
        return secondJoinResult.joinFuture.get();
    }

    public JoinGroupResponseData joinClassicGroupAndCompleteJoin(JoinGroupRequestData request, boolean requireKnownMemberId, boolean supportSkippingAssignment) throws ExecutionException, InterruptedException {
        return this.joinClassicGroupAndCompleteJoin(request, requireKnownMemberId, supportSkippingAssignment, this.classicGroupInitialRebalanceDelayMs);
    }

    public JoinGroupResponseData joinClassicGroupAndCompleteJoin(JoinGroupRequestData request, boolean requireKnownMemberId, boolean supportSkippingAssignment, int advanceClockMs) throws ExecutionException, InterruptedException {
        if (requireKnownMemberId && request.groupInstanceId().isEmpty()) {
            return this.joinClassicGroupAsDynamicMemberAndCompleteJoin(request);
        }
        try {
            JoinResult joinResult = this.sendClassicGroupJoin(request, requireKnownMemberId, supportSkippingAssignment);
            this.sleep(advanceClockMs);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)joinResult.joinFuture.isDone());
            org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)joinResult.joinFuture.get().errorCode());
            return joinResult.joinFuture.get();
        }
        catch (Exception e) {
            org.junit.jupiter.api.Assertions.fail((String)("Failed to due: " + e.getMessage()));
            return null;
        }
    }

    public SyncResult sendClassicGroupSync(SyncGroupRequestData request) {
        CompletableFuture<SyncGroupResponseData> responseFuture;
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.SYNC_GROUP, ApiKeys.SYNC_GROUP.latestVersion(), DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        CoordinatorResult coordinatorResult = this.groupMetadataManager.classicGroupSync(context, request, responseFuture = new CompletableFuture<SyncGroupResponseData>());
        if (coordinatorResult.replayRecords()) {
            coordinatorResult.records().forEach(this::replay);
        }
        return new SyncResult(responseFuture, (CoordinatorResult<Void, CoordinatorRecord>)coordinatorResult);
    }

    public RebalanceResult staticMembersJoinAndRebalance(String groupId, String leaderInstanceId, String followerInstanceId) throws Exception {
        return this.staticMembersJoinAndRebalance(groupId, leaderInstanceId, followerInstanceId, 10000, 5000);
    }

    public RebalanceResult staticMembersJoinAndRebalance(String groupId, String leaderInstanceId, String followerInstanceId, int rebalanceTimeoutMs, int sessionTimeoutMs) throws Exception {
        ClassicGroup group = this.createClassicGroup(groupId);
        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder().withGroupId(groupId).withGroupInstanceId(leaderInstanceId).withMemberId("").withProtocolType("consumer").withProtocolSuperset().withRebalanceTimeoutMs(rebalanceTimeoutMs).withSessionTimeoutMs(sessionTimeoutMs).build();
        JoinResult leaderJoinResult = this.sendClassicGroupJoin(joinRequest);
        JoinResult followerJoinResult = this.sendClassicGroupJoin(joinRequest.setGroupInstanceId(followerInstanceId));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)leaderJoinResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)followerJoinResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertFalse((boolean)leaderJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertFalse((boolean)followerJoinResult.joinFuture.isDone());
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(this.classicGroupInitialRebalanceDelayMs));
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(this.classicGroupInitialRebalanceDelayMs));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)leaderJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)followerJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)leaderJoinResult.joinFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)followerJoinResult.joinFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)leaderJoinResult.joinFuture.get().generationId());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)followerJoinResult.joinFuture.get().generationId());
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)group.numMembers());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)group.generationId());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        String leaderId = leaderJoinResult.joinFuture.get().memberId();
        String followerId = followerJoinResult.joinFuture.get().memberId();
        ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment> assignment = new ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment>();
        assignment.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(leaderId).setAssignment(new byte[]{1}));
        assignment.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(followerId).setAssignment(new byte[]{2}));
        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder().withGroupId(groupId).withGroupInstanceId(leaderInstanceId).withMemberId(leaderId).withGenerationId(1).withAssignment(assignment).build();
        SyncResult leaderSyncResult = this.sendClassicGroupSync(syncRequest);
        Map<String, byte[]> groupAssignment = assignment.stream().collect(Collectors.toMap(SyncGroupRequestData.SyncGroupRequestAssignment::memberId, SyncGroupRequestData.SyncGroupRequestAssignment::assignment));
        org.junit.jupiter.api.Assertions.assertEquals(List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord((ClassicGroup)group, groupAssignment)), leaderSyncResult.records);
        leaderSyncResult.appendFuture.complete(null);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)leaderSyncResult.syncFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)leaderSyncResult.syncFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        SyncResult followerSyncResult = this.sendClassicGroupSync(syncRequest.setGroupInstanceId(followerInstanceId).setMemberId(followerId).setAssignments(Collections.emptyList()));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)followerSyncResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)followerSyncResult.syncFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)followerSyncResult.syncFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)group.numMembers());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)group.generationId());
        return new RebalanceResult(1, leaderId, leaderSyncResult.syncFuture.get().assignment(), followerId, followerSyncResult.syncFuture.get().assignment());
    }

    public PendingMemberGroupResult setupGroupWithPendingMember(ClassicGroup group) throws Exception {
        JoinGroupRequestData joinRequest = new JoinGroupRequestBuilder().withGroupId(group.groupId()).withMemberId("").withDefaultProtocolTypeAndProtocols().withRebalanceTimeoutMs(10000).withSessionTimeoutMs(5000).build();
        JoinGroupResponseData leaderJoinResponse = this.joinClassicGroupAsDynamicMemberAndCompleteJoin(joinRequest);
        ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment> assignment = new ArrayList<SyncGroupRequestData.SyncGroupRequestAssignment>();
        assignment.add(new SyncGroupRequestData.SyncGroupRequestAssignment().setMemberId(leaderJoinResponse.memberId()));
        SyncGroupRequestData syncRequest = new SyncGroupRequestBuilder().withGroupId(group.groupId()).withMemberId(leaderJoinResponse.memberId()).withGenerationId(leaderJoinResponse.generationId()).withAssignment(assignment).build();
        SyncResult syncResult = this.sendClassicGroupSync(syncRequest);
        org.junit.jupiter.api.Assertions.assertEquals(List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord((ClassicGroup)group, (Map)group.groupAssignment())), syncResult.records);
        syncResult.appendFuture.complete(null);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)syncResult.syncFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)syncResult.syncFuture.get().errorCode());
        JoinResult followerJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(""));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)followerJoinResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertFalse((boolean)followerJoinResult.joinFuture.isDone());
        JoinResult leaderJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(leaderJoinResponse.memberId()));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)leaderJoinResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)leaderJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)followerJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)leaderJoinResult.joinFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)followerJoinResult.joinFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertEquals((int)leaderJoinResult.joinFuture.get().generationId(), (int)followerJoinResult.joinFuture.get().generationId());
        org.junit.jupiter.api.Assertions.assertEquals((Object)leaderJoinResponse.memberId(), (Object)leaderJoinResult.joinFuture.get().leader());
        org.junit.jupiter.api.Assertions.assertEquals((Object)leaderJoinResponse.memberId(), (Object)followerJoinResult.joinFuture.get().leader());
        int nextGenerationId = leaderJoinResult.joinFuture.get().generationId();
        String followerId = followerJoinResult.joinFuture.get().memberId();
        syncResult = this.sendClassicGroupSync(syncRequest.setGenerationId(nextGenerationId));
        org.junit.jupiter.api.Assertions.assertEquals(List.of(GroupCoordinatorRecordHelpers.newGroupMetadataRecord((ClassicGroup)group, (Map)group.groupAssignment())), syncResult.records);
        syncResult.appendFuture.complete(null);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)syncResult.syncFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)syncResult.syncFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.STABLE));
        leaderJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(leaderJoinResponse.memberId()));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)leaderJoinResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertFalse((boolean)leaderJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.PREPARING_REBALANCE));
        JoinResult pendingMemberJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId("").setSessionTimeoutMs(2500), true);
        org.junit.jupiter.api.Assertions.assertTrue((boolean)pendingMemberJoinResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)pendingMemberJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertEquals((short)Errors.MEMBER_ID_REQUIRED.code(), (short)pendingMemberJoinResult.joinFuture.get().errorCode());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)group.numPendingJoinMembers());
        followerJoinResult = this.sendClassicGroupJoin(joinRequest.setMemberId(followerId).setSessionTimeoutMs(5000));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)followerJoinResult.records.isEmpty());
        org.junit.jupiter.api.Assertions.assertFalse((boolean)followerJoinResult.joinFuture.isDone());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.PREPARING_REBALANCE));
        org.junit.jupiter.api.Assertions.assertEquals((int)2, (int)group.numMembers());
        org.junit.jupiter.api.Assertions.assertEquals((int)1, (int)group.numPendingJoinMembers());
        return new PendingMemberGroupResult(leaderJoinResponse.memberId(), followerId, pendingMemberJoinResult.joinFuture.get());
    }

    public void verifySessionExpiration(ClassicGroup group, int timeoutMs) {
        Set expectedHeartbeatKeys = group.allMembers().stream().map(member -> GroupMetadataManager.classicGroupHeartbeatKey((String)group.groupId(), (String)member.memberId())).collect(Collectors.toSet());
        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = this.sleep(timeoutMs);
        List<CoordinatorRecord> expectedRecords = List.of(GroupMetadataManagerTestContext.newGroupMetadataRecord(group.groupId(), new GroupMetadataValue().setMembers(Collections.emptyList()).setGeneration(group.generationId()).setLeader(null).setProtocolType("consumer").setProtocol(null).setCurrentStateTimestamp(this.time.milliseconds())));
        Set heartbeatKeys = timeouts.stream().map(timeout -> timeout.key).collect(Collectors.toSet());
        org.junit.jupiter.api.Assertions.assertEquals(expectedHeartbeatKeys, heartbeatKeys);
        int timeoutsSize = timeouts.size();
        org.junit.jupiter.api.Assertions.assertEquals(expectedRecords, (Object)timeouts.get((int)(timeoutsSize - 1)).result.records());
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(timeouts.subList(0, timeoutsSize - 1));
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.EMPTY));
        org.junit.jupiter.api.Assertions.assertEquals((int)0, (int)group.numMembers());
    }

    public CoordinatorResult<HeartbeatResponseData, CoordinatorRecord> sendClassicGroupHeartbeat(HeartbeatRequestData request) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.HEARTBEAT, ApiKeys.HEARTBEAT.latestVersion(), DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        return this.groupMetadataManager.classicGroupHeartbeat(context, request);
    }

    public List<ListGroupsResponseData.ListedGroup> sendListGroups(List<String> statesFilter, List<String> typesFilter) {
        HashSet<String> statesFilterSet = new HashSet<String>(statesFilter);
        HashSet<String> typesFilterSet = new HashSet<String>(typesFilter);
        return this.groupMetadataManager.listGroups(statesFilterSet, typesFilterSet, this.lastCommittedOffset);
    }

    public List<ConsumerGroupDescribeResponseData.DescribedGroup> sendConsumerGroupDescribe(List<String> groupIds) {
        return this.groupMetadataManager.consumerGroupDescribe(groupIds, this.lastCommittedOffset);
    }

    public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.DESCRIBE_GROUPS, ApiKeys.DESCRIBE_GROUPS.latestVersion(), DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        return this.groupMetadataManager.describeGroups(context, groupIds, this.lastCommittedOffset);
    }

    public List<DescribeGroupsResponseData.DescribedGroup> describeGroups(List<String> groupIds, short apiVersion) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.DESCRIBE_GROUPS, apiVersion, DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        return this.groupMetadataManager.describeGroups(context, groupIds, this.lastCommittedOffset);
    }

    public List<ShareGroupDescribeResponseData.DescribedGroup> sendShareGroupDescribe(List<String> groupIds) {
        return this.groupMetadataManager.shareGroupDescribe(groupIds, this.lastCommittedOffset);
    }

    public void verifyHeartbeat(String groupId, JoinGroupResponseData joinResponse, Errors expectedError) {
        HeartbeatRequestData request = new HeartbeatRequestData().setGroupId(groupId).setMemberId(joinResponse.memberId()).setGenerationId(joinResponse.generationId());
        if (expectedError == Errors.UNKNOWN_MEMBER_ID) {
            org.junit.jupiter.api.Assertions.assertThrows(UnknownMemberIdException.class, () -> this.sendClassicGroupHeartbeat(request));
        } else {
            HeartbeatResponseData response = (HeartbeatResponseData)this.sendClassicGroupHeartbeat(request).response();
            org.junit.jupiter.api.Assertions.assertEquals((short)expectedError.code(), (short)response.errorCode());
        }
    }

    public List<JoinGroupResponseData> joinWithNMembers(String groupId, int numMembers, int rebalanceTimeoutMs, int sessionTimeoutMs) {
        ClassicGroup group = this.createClassicGroup(groupId);
        boolean requireKnownMemberId = true;
        JoinGroupRequestData request = new JoinGroupRequestBuilder().withGroupId(groupId).withMemberId("").withDefaultProtocolTypeAndProtocols().withRebalanceTimeoutMs(rebalanceTimeoutMs).withSessionTimeoutMs(sessionTimeoutMs).build();
        List memberIds = IntStream.range(0, numMembers).mapToObj(i -> {
            JoinResult joinResult = this.sendClassicGroupJoin(request, requireKnownMemberId);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)joinResult.records.isEmpty());
            org.junit.jupiter.api.Assertions.assertTrue((boolean)joinResult.joinFuture.isDone());
            try {
                return joinResult.joinFuture.get().memberId();
            }
            catch (Exception e) {
                org.junit.jupiter.api.Assertions.fail((String)("Unexpected exception: " + e.getMessage()));
                return null;
            }
        }).collect(Collectors.toList());
        List<CompletableFuture> secondJoinFutures = IntStream.range(0, numMembers).mapToObj(i -> {
            JoinResult joinResult = this.sendClassicGroupJoin(request.setMemberId((String)memberIds.get(i)), requireKnownMemberId);
            org.junit.jupiter.api.Assertions.assertTrue((boolean)joinResult.records.isEmpty());
            org.junit.jupiter.api.Assertions.assertFalse((boolean)joinResult.joinFuture.isDone());
            return joinResult.joinFuture;
        }).collect(Collectors.toList());
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(this.classicGroupInitialRebalanceDelayMs));
        secondJoinFutures.forEach(future -> org.junit.jupiter.api.Assertions.assertFalse((boolean)future.isDone()));
        GroupMetadataManagerTestContext.assertNoOrEmptyResult(this.sleep(rebalanceTimeoutMs));
        List<JoinGroupResponseData> joinResponses = secondJoinFutures.stream().map(future -> {
            org.junit.jupiter.api.Assertions.assertTrue((boolean)future.isDone());
            try {
                org.junit.jupiter.api.Assertions.assertEquals((short)Errors.NONE.code(), (short)((JoinGroupResponseData)future.get()).errorCode());
                return (JoinGroupResponseData)future.get();
            }
            catch (Exception e) {
                org.junit.jupiter.api.Assertions.fail((String)("Unexpected exception: " + e.getMessage()));
                return null;
            }
        }).collect(Collectors.toList());
        org.junit.jupiter.api.Assertions.assertEquals((int)numMembers, (int)group.numMembers());
        org.junit.jupiter.api.Assertions.assertTrue((boolean)group.isInState(ClassicGroupState.COMPLETING_REBALANCE));
        return joinResponses;
    }

    public CoordinatorResult<LeaveGroupResponseData, CoordinatorRecord> sendClassicGroupLeave(LeaveGroupRequestData request) {
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.LEAVE_GROUP, ApiKeys.LEAVE_GROUP.latestVersion(), DEFAULT_CLIENT_ID, 0), "1", DEFAULT_CLIENT_ADDRESS, KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        return this.groupMetadataManager.classicGroupLeave(context, request);
    }

    public void verifyDescribeGroupsReturnsDeadGroup(String groupId) {
        List<DescribeGroupsResponseData.DescribedGroup> describedGroups = this.describeGroups(List.of(groupId));
        org.junit.jupiter.api.Assertions.assertEquals(List.of(new DescribeGroupsResponseData.DescribedGroup().setGroupId(groupId).setGroupState(ClassicGroupState.DEAD.toString()).setErrorCode(Errors.GROUP_ID_NOT_FOUND.code()).setErrorMessage("Group " + groupId + " not found.")), describedGroups);
    }

    public void verifyDescribeGroupsBeforeV6ReturnsDeadGroup(String groupId) {
        List<DescribeGroupsResponseData.DescribedGroup> describedGroups = this.describeGroups(Collections.singletonList(groupId), (short)5);
        org.junit.jupiter.api.Assertions.assertEquals(Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup().setGroupId(groupId).setGroupState(ClassicGroupState.DEAD.toString())), describedGroups);
    }

    public void verifyClassicGroupSyncToConsumerGroup(String groupId, String memberId, int generationId, String protocolName, String protocolType, List<TopicPartition> topicPartitionList, short version) throws Exception {
        SyncResult syncResult = this.sendClassicGroupSync(new SyncGroupRequestBuilder().withGroupId(groupId).withMemberId(memberId).withGenerationId(generationId).withProtocolName(protocolName).withProtocolType(protocolType).build());
        org.junit.jupiter.api.Assertions.assertEquals(Collections.emptyList(), syncResult.records);
        org.junit.jupiter.api.Assertions.assertFalse((boolean)syncResult.syncFuture.isDone());
        syncResult.appendFuture.complete(null);
        Assertions.assertResponseEquals((ApiMessage)new SyncGroupResponseData().setProtocolType(protocolType).setProtocolName(protocolName).setAssignment(ConsumerProtocol.serializeAssignment((ConsumerPartitionAssignor.Assignment)new ConsumerPartitionAssignor.Assignment(topicPartitionList), (short)version).array()), (ApiMessage)syncResult.syncFuture.get());
        this.assertSessionTimeout(groupId, memberId, 5000L);
        this.assertNoSyncTimeout(groupId, memberId);
    }

    public void verifyClassicGroupSyncToConsumerGroup(String groupId, String memberId, int generationId, String protocolName, String protocolType, List<TopicPartition> topicPartitionList) throws Exception {
        this.verifyClassicGroupSyncToConsumerGroup(groupId, memberId, generationId, protocolName, protocolType, topicPartitionList, (short)3);
    }

    private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
        if (apiMessageAndVersion == null) {
            return null;
        }
        return apiMessageAndVersion.message();
    }

    public void replay(CoordinatorRecord record) {
        ApiMessage key = record.key();
        ApiMessageAndVersion value = record.value();
        if (key == null) {
            throw new IllegalStateException("Received a null key in " + String.valueOf(record));
        }
        switch (CoordinatorRecordType.fromId((short)record.key().apiKey())) {
            case GROUP_METADATA: {
                this.groupMetadataManager.replay((GroupMetadataKey)key, (GroupMetadataValue)this.messageOrNull(value));
                break;
            }
            case CONSUMER_GROUP_MEMBER_METADATA: {
                this.groupMetadataManager.replay((ConsumerGroupMemberMetadataKey)key, (ConsumerGroupMemberMetadataValue)this.messageOrNull(value));
                break;
            }
            case CONSUMER_GROUP_METADATA: {
                this.groupMetadataManager.replay((ConsumerGroupMetadataKey)key, (ConsumerGroupMetadataValue)this.messageOrNull(value));
                break;
            }
            case CONSUMER_GROUP_PARTITION_METADATA: {
                this.groupMetadataManager.replay((ConsumerGroupPartitionMetadataKey)key, (ConsumerGroupPartitionMetadataValue)this.messageOrNull(value));
                break;
            }
            case CONSUMER_GROUP_TARGET_ASSIGNMENT_MEMBER: {
                this.groupMetadataManager.replay((ConsumerGroupTargetAssignmentMemberKey)key, (ConsumerGroupTargetAssignmentMemberValue)this.messageOrNull(value));
                break;
            }
            case CONSUMER_GROUP_TARGET_ASSIGNMENT_METADATA: {
                this.groupMetadataManager.replay((ConsumerGroupTargetAssignmentMetadataKey)key, (ConsumerGroupTargetAssignmentMetadataValue)this.messageOrNull(value));
                break;
            }
            case CONSUMER_GROUP_CURRENT_MEMBER_ASSIGNMENT: {
                this.groupMetadataManager.replay((ConsumerGroupCurrentMemberAssignmentKey)key, (ConsumerGroupCurrentMemberAssignmentValue)this.messageOrNull(value));
                break;
            }
            case SHARE_GROUP_MEMBER_METADATA: {
                this.groupMetadataManager.replay((ShareGroupMemberMetadataKey)key, (ShareGroupMemberMetadataValue)this.messageOrNull(value));
                break;
            }
            case SHARE_GROUP_METADATA: {
                this.groupMetadataManager.replay((ShareGroupMetadataKey)key, (ShareGroupMetadataValue)this.messageOrNull(value));
                break;
            }
            case SHARE_GROUP_PARTITION_METADATA: {
                this.groupMetadataManager.replay((ShareGroupPartitionMetadataKey)key, (ShareGroupPartitionMetadataValue)this.messageOrNull(value));
                break;
            }
            case SHARE_GROUP_TARGET_ASSIGNMENT_MEMBER: {
                this.groupMetadataManager.replay((ShareGroupTargetAssignmentMemberKey)key, (ShareGroupTargetAssignmentMemberValue)this.messageOrNull(value));
                break;
            }
            case SHARE_GROUP_TARGET_ASSIGNMENT_METADATA: {
                this.groupMetadataManager.replay((ShareGroupTargetAssignmentMetadataKey)key, (ShareGroupTargetAssignmentMetadataValue)this.messageOrNull(value));
                break;
            }
            case SHARE_GROUP_CURRENT_MEMBER_ASSIGNMENT: {
                this.groupMetadataManager.replay((ShareGroupCurrentMemberAssignmentKey)key, (ShareGroupCurrentMemberAssignmentValue)this.messageOrNull(value));
                break;
            }
            case CONSUMER_GROUP_REGULAR_EXPRESSION: {
                this.groupMetadataManager.replay((ConsumerGroupRegularExpressionKey)key, (ConsumerGroupRegularExpressionValue)this.messageOrNull(value));
                break;
            }
            default: {
                throw new IllegalStateException("Received an unknown record type " + record.key().apiKey() + " in " + String.valueOf(record));
            }
        }
        ++this.lastWrittenOffset;
        this.snapshotRegistry.idempotentCreateSnapshot(this.lastWrittenOffset);
    }

    void onLoaded() {
        this.groupMetadataManager.onLoaded();
    }

    void onUnloaded() {
        this.groupMetadataManager.onUnloaded();
    }

    public void updateGroupConfig(String groupId, Properties newGroupConfig) {
        this.groupConfigManager.updateGroupConfig(groupId, newGroupConfig);
    }

    public static class JoinResult {
        CompletableFuture<JoinGroupResponseData> joinFuture;
        List<CoordinatorRecord> records;
        CompletableFuture<Void> appendFuture;

        public JoinResult(CompletableFuture<JoinGroupResponseData> joinFuture, CoordinatorResult<Void, CoordinatorRecord> coordinatorResult) {
            this.joinFuture = joinFuture;
            this.records = coordinatorResult.records();
            this.appendFuture = coordinatorResult.appendFuture();
        }
    }

    public static class JoinGroupRequestBuilder {
        String groupId = null;
        String groupInstanceId = null;
        String memberId = null;
        String protocolType = "consumer";
        JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols = new JoinGroupRequestData.JoinGroupRequestProtocolCollection(0);
        int sessionTimeoutMs = 500;
        int rebalanceTimeoutMs = 500;
        String reason = null;

        JoinGroupRequestBuilder withGroupId(String groupId) {
            this.groupId = groupId;
            return this;
        }

        JoinGroupRequestBuilder withGroupInstanceId(String groupInstanceId) {
            this.groupInstanceId = groupInstanceId;
            return this;
        }

        JoinGroupRequestBuilder withMemberId(String memberId) {
            this.memberId = memberId;
            return this;
        }

        JoinGroupRequestBuilder withDefaultProtocolTypeAndProtocols() {
            this.protocols = GroupMetadataManagerTestContext.toProtocols("range");
            return this;
        }

        JoinGroupRequestBuilder withProtocolSuperset() {
            this.protocols = GroupMetadataManagerTestContext.toProtocols("range", "roundrobin");
            return this;
        }

        JoinGroupRequestBuilder withProtocolType(String protocolType) {
            this.protocolType = protocolType;
            return this;
        }

        JoinGroupRequestBuilder withProtocols(JoinGroupRequestData.JoinGroupRequestProtocolCollection protocols) {
            this.protocols = protocols;
            return this;
        }

        JoinGroupRequestBuilder withRebalanceTimeoutMs(int rebalanceTimeoutMs) {
            this.rebalanceTimeoutMs = rebalanceTimeoutMs;
            return this;
        }

        JoinGroupRequestBuilder withSessionTimeoutMs(int sessionTimeoutMs) {
            this.sessionTimeoutMs = sessionTimeoutMs;
            return this;
        }

        JoinGroupRequestBuilder withReason(String reason) {
            this.reason = reason;
            return this;
        }

        JoinGroupRequestData build() {
            return new JoinGroupRequestData().setGroupId(this.groupId).setGroupInstanceId(this.groupInstanceId).setMemberId(this.memberId).setProtocolType(this.protocolType).setProtocols(this.protocols).setRebalanceTimeoutMs(this.rebalanceTimeoutMs).setSessionTimeoutMs(this.sessionTimeoutMs).setReason(this.reason);
        }
    }

    public static class SyncGroupRequestBuilder {
        String groupId = null;
        String groupInstanceId = null;
        String memberId = null;
        String protocolType = "consumer";
        String protocolName = "range";
        int generationId = 0;
        List<SyncGroupRequestData.SyncGroupRequestAssignment> assignment = Collections.emptyList();

        SyncGroupRequestBuilder withGroupId(String groupId) {
            this.groupId = groupId;
            return this;
        }

        SyncGroupRequestBuilder withGroupInstanceId(String groupInstanceId) {
            this.groupInstanceId = groupInstanceId;
            return this;
        }

        SyncGroupRequestBuilder withMemberId(String memberId) {
            this.memberId = memberId;
            return this;
        }

        SyncGroupRequestBuilder withGenerationId(int generationId) {
            this.generationId = generationId;
            return this;
        }

        SyncGroupRequestBuilder withProtocolType(String protocolType) {
            this.protocolType = protocolType;
            return this;
        }

        SyncGroupRequestBuilder withProtocolName(String protocolName) {
            this.protocolName = protocolName;
            return this;
        }

        SyncGroupRequestBuilder withAssignment(List<SyncGroupRequestData.SyncGroupRequestAssignment> assignment) {
            this.assignment = assignment;
            return this;
        }

        SyncGroupRequestData build() {
            return new SyncGroupRequestData().setGroupId(this.groupId).setGroupInstanceId(this.groupInstanceId).setMemberId(this.memberId).setGenerationId(this.generationId).setProtocolType(this.protocolType).setProtocolName(this.protocolName).setAssignments(this.assignment);
        }
    }

    public static class SyncResult {
        CompletableFuture<SyncGroupResponseData> syncFuture;
        List<CoordinatorRecord> records;
        CompletableFuture<Void> appendFuture;

        public SyncResult(CompletableFuture<SyncGroupResponseData> syncFuture, CoordinatorResult<Void, CoordinatorRecord> coordinatorResult) {
            this.syncFuture = syncFuture;
            this.records = coordinatorResult.records();
            this.appendFuture = coordinatorResult.appendFuture();
        }
    }

    public static class RebalanceResult {
        int generationId;
        String leaderId;
        byte[] leaderAssignment;
        String followerId;
        byte[] followerAssignment;

        RebalanceResult(int generationId, String leaderId, byte[] leaderAssignment, String followerId, byte[] followerAssignment) {
            this.generationId = generationId;
            this.leaderId = leaderId;
            this.leaderAssignment = leaderAssignment;
            this.followerId = followerId;
            this.followerAssignment = followerAssignment;
        }
    }

    public static class PendingMemberGroupResult {
        String leaderId;
        String followerId;
        JoinGroupResponseData pendingMemberResponse;

        public PendingMemberGroupResult(String leaderId, String followerId, JoinGroupResponseData pendingMemberResponse) {
            this.leaderId = leaderId;
            this.followerId = followerId;
            this.pendingMemberResponse = pendingMemberResponse;
        }
    }

    public static class Builder {
        private MockTime time = new MockTime(0L, 0L, 0L);
        private final MockCoordinatorTimer<Void, CoordinatorRecord> timer = new MockCoordinatorTimer((Time)this.time);
        private final MockCoordinatorExecutor<CoordinatorRecord> executor = new MockCoordinatorExecutor();
        private final LogContext logContext = new LogContext();
        private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
        private MetadataImage metadataImage;
        private GroupConfigManager groupConfigManager;
        private final List<ConsumerGroupBuilder> consumerGroupBuilders = new ArrayList<ConsumerGroupBuilder>();
        private final GroupCoordinatorMetricsShard metrics = (GroupCoordinatorMetricsShard)Mockito.mock(GroupCoordinatorMetricsShard.class);
        private ShareGroupPartitionAssignor shareGroupAssignor = new MockPartitionAssignor("share");
        private final List<ShareGroupBuilder> shareGroupBuilders = new ArrayList<ShareGroupBuilder>();
        private final Map<String, Object> config = new HashMap<String, Object>();
        private Optional<Authorizer> authorizer = Optional.empty();

        public Builder withConfig(String key, Object value) {
            this.config.put(key, value);
            return this;
        }

        public Builder withMetadataImage(MetadataImage metadataImage) {
            this.metadataImage = metadataImage;
            return this;
        }

        public Builder withConsumerGroup(ConsumerGroupBuilder builder) {
            this.consumerGroupBuilders.add(builder);
            return this;
        }

        public Builder withShareGroup(ShareGroupBuilder builder) {
            this.shareGroupBuilders.add(builder);
            return this;
        }

        public Builder withShareGroupAssignor(ShareGroupPartitionAssignor shareGroupAssignor) {
            this.shareGroupAssignor = shareGroupAssignor;
            return this;
        }

        public Builder withTime(MockTime time) {
            this.time = time;
            return this;
        }

        public Builder withAuthorizer(Authorizer authorizer) {
            this.authorizer = Optional.of(authorizer);
            return this;
        }

        public GroupMetadataManagerTestContext build() {
            if (this.metadataImage == null) {
                this.metadataImage = MetadataImage.EMPTY;
            }
            if (this.groupConfigManager == null) {
                this.groupConfigManager = GroupConfigManagerTest.createConfigManager();
            }
            this.config.putIfAbsent("group.consumer.assignors", List.of(new MockPartitionAssignor("range")));
            GroupCoordinatorConfig groupCoordinatorConfig = GroupCoordinatorConfigContext.fromProps(this.config);
            GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext(this.time, this.timer, this.executor, this.snapshotRegistry, this.metrics, groupCoordinatorConfig, new GroupMetadataManager.Builder().withSnapshotRegistry(this.snapshotRegistry).withLogContext(this.logContext).withTime((Time)this.time).withTimer(this.timer).withExecutor(this.executor).withConfig(groupCoordinatorConfig).withMetadataImage(this.metadataImage).withGroupCoordinatorMetricsShard(this.metrics).withShareGroupAssignor(this.shareGroupAssignor).withGroupConfigManager(this.groupConfigManager).withAuthorizer(this.authorizer).build(), this.groupConfigManager);
            this.consumerGroupBuilders.forEach(builder -> builder.build(this.metadataImage.topics()).forEach(context::replay));
            this.shareGroupBuilders.forEach(builder -> builder.build(this.metadataImage.topics()).forEach(context::replay));
            context.commit();
            return context;
        }
    }

    private static class GroupCoordinatorConfigContext
    extends GroupCoordinatorConfig {
        GroupCoordinatorConfigContext(AbstractConfig config) {
            super(config);
        }

        public static GroupCoordinatorConfig fromProps(Map<?, ?> props) {
            return new GroupCoordinatorConfigContext(new AbstractConfig(Utils.mergeConfigs(List.of(GroupCoordinatorConfig.CLASSIC_GROUP_CONFIG_DEF, GroupCoordinatorConfig.GROUP_COORDINATOR_CONFIG_DEF, GroupCoordinatorConfig.OFFSET_MANAGEMENT_CONFIG_DEF, GroupCoordinatorConfig.CONSUMER_GROUP_CONFIG_DEF, GroupCoordinatorConfig.SHARE_GROUP_CONFIG_DEF)), props));
        }

        protected List<ConsumerGroupPartitionAssignor> consumerGroupAssignors(AbstractConfig config) {
            List classes = config.getList("group.consumer.assignors");
            if (classes.stream().allMatch(o -> o instanceof ConsumerGroupPartitionAssignor)) {
                return Collections.unmodifiableList(classes);
            }
            return super.consumerGroupAssignors(config);
        }
    }
}

