/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group;

import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorExecutor;
import org.apache.kafka.coordinator.common.runtime.MockCoordinatorTimer;
import org.apache.kafka.coordinator.group.Group;
import org.apache.kafka.coordinator.group.GroupConfigManager;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfigTest;
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordHelpers;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
import org.apache.kafka.coordinator.group.MetadataImageBuilder;
import org.apache.kafka.coordinator.group.OffsetAndMetadata;
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
import org.apache.kafka.coordinator.group.OffsetMetadataManager;
import org.apache.kafka.coordinator.group.classic.ClassicGroup;
import org.apache.kafka.coordinator.group.classic.ClassicGroupMember;
import org.apache.kafka.coordinator.group.classic.ClassicGroupState;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMemberMetadataValue;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetricsShard;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroup;
import org.apache.kafka.coordinator.group.modern.consumer.ConsumerGroupMember;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class OffsetMetadataManagerTest {
    @ParameterizedTest
    @ApiKeyVersionsSource(apiKey=ApiKeys.OFFSET_COMMIT)
    public void testOffsetCommitWithUnknownGroup(short version) {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        Class expectedType = version >= 9 ? GroupIdNotFoundException.class : IllegalGenerationException.class;
        Assertions.assertThrows(expectedType, () -> context.commitOffset(version, new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWithDeadGroup() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.transitionTo(ClassicGroupState.DEAD);
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWithUnknownMemberId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWithIllegalGeneration() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.add(this.mkGenericMember("member", Optional.of("new-instance-id")));
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWithUnknownInstanceId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.add(this.mkGenericMember("member", Optional.empty()));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGroupInstanceId("instanceid").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWithFencedInstanceId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.add(this.mkGenericMember("member", Optional.of("new-instance-id")));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGroupInstanceId("old-instance-id").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWhileInCompletingRebalanceState() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.add(this.mkGenericMember("member", Optional.of("new-instance-id")));
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        Assertions.assertThrows(RebalanceInProgressException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(1).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWithoutMemberIdAndGeneration() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.add(this.mkGenericMember("member", Optional.of("new-instance-id")));
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testGenericGroupOffsetCommitWithRetentionTime() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.add(this.mkGenericMember("member", Optional.of("new-instance-id")));
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        group.transitionTo(ClassicGroupState.STABLE);
        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(1).setRetentionTimeMs(1234L).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals((Object)new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)0, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.empty(), "", context.time.milliseconds(), OptionalLong.of(context.time.milliseconds() + 1234L)))), (Object)result.records());
    }

    @Test
    public void testGenericGroupOffsetCommitMaintainsSession() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        ClassicGroupMember member = this.mkGenericMember("member", Optional.empty());
        group.add(member);
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        group.transitionTo(ClassicGroupState.STABLE);
        context.groupMetadataManager.rescheduleClassicGroupMemberHeartbeat(group, member);
        Assertions.assertEquals(Collections.emptyList(), context.sleep(2500L));
        context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(1).setRetentionTimeMs(1234L).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals(Collections.emptyList(), context.sleep(2500L));
        List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> timeouts = context.sleep(2500L);
        Assertions.assertEquals((int)1, (int)timeouts.size());
        Assertions.assertFalse((boolean)group.hasMember(member.memberId()));
    }

    @Test
    public void testSimpleGroupOffsetCommit() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals((Object)new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)0, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.empty(), "", context.time.milliseconds(), OptionalLong.empty()))), (Object)result.records());
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", false);
        Assertions.assertNotNull((Object)group);
        Assertions.assertEquals((Object)"foo", (Object)group.groupId());
    }

    @Test
    public void testSimpleGroupOffsetCommitWithInstanceId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setGroupInstanceId("instance-id").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals((Object)new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)0, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.empty(), "", context.time.milliseconds(), OptionalLong.empty()))), (Object)result.records());
    }

    @Test
    public void testConsumerGroupOffsetCommitWithUnknownMemberId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))))));
    }

    @Test
    public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setPreviousMemberEpoch(10).build());
        OffsetCommitRequestData request = new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(9).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        Assertions.assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request));
        request.setGenerationIdOrMemberEpoch(11);
        Assertions.assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(request));
    }

    @Test
    public void testConsumerGroupOffsetCommitWithIllegalGenerationId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setPreviousMemberEpoch(10).setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()).build());
        OffsetCommitRequestData request = new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(9).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));
        request.setGenerationIdOrMemberEpoch(11);
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.commitOffset(request));
    }

    @Test
    public void testConsumerGroupOffsetCommitFromAdminClient() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals((Object)new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)0, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.empty(), "", context.time.milliseconds(), OptionalLong.empty()))), (Object)result.records());
    }

    @Test
    public void testConsumerGroupOffsetCommit() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setPreviousMemberEpoch(10).build());
        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata"))))));
        Assertions.assertEquals((Object)new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)0, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.of(10), "metadata", context.time.milliseconds(), OptionalLong.empty()))), (Object)result.records());
    }

    @Test
    public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().withOffsetMetadataMaxSize(5).build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setPreviousMemberEpoch(10).build());
        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Arrays.asList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("toolarge"), new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(1).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("small"))))));
        Assertions.assertEquals((Object)new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Arrays.asList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()), new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)1, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.of(10), "small", context.time.milliseconds(), OptionalLong.empty()))), (Object)result.records());
    }

    @Test
    public void testConsumerGroupTransactionalOffsetCommit() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setPreviousMemberEpoch(10).build());
        CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> result = context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata"))))));
        Assertions.assertEquals((Object)new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)0, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.of(10), "metadata", context.time.milliseconds(), OptionalLong.empty()))), (Object)result.records());
    }

    @Test
    public void testConsumerGroupTransactionalOffsetCommitWithUnknownGroupId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata")))))));
    }

    @Test
    public void testConsumerGroupTransactionalOffsetCommitWithUnknownMemberId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata")))))));
    }

    @Test
    public void testConsumerGroupTransactionalOffsetCommitWithStaleMemberEpoch() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setPreviousMemberEpoch(10).build());
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(100).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata")))))));
    }

    @Test
    public void testGenericGroupTransactionalOffsetCommit() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        ClassicGroupMember member = this.mkGenericMember("member", Optional.empty());
        group.add(member);
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        group.transitionTo(ClassicGroupState.STABLE);
        CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> result = context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(1).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata"))))));
        Assertions.assertEquals((Object)new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), (Object)result.response());
        Assertions.assertEquals(Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)"foo", (String)"bar", (int)0, (OffsetAndMetadata)new OffsetAndMetadata(100L, OptionalInt.of(10), "metadata", context.time.milliseconds(), OptionalLong.empty()))), (Object)result.records());
    }

    @Test
    public void testGenericGroupTransactionalOffsetCommitWithUnknownGroupId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata")))))));
    }

    @Test
    public void testGenericGroupTransactionalOffsetCommitWithUnknownMemberId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata")))))));
    }

    @Test
    public void testGenericGroupTransactionalOffsetCommitWithIllegalGenerationId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        ClassicGroupMember member = this.mkGenericMember("member", Optional.empty());
        group.add(member);
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        group.transitionTo(ClassicGroupState.STABLE);
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.commitTransactionalOffset(new TxnOffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationId(100).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata")))))));
    }

    @Test
    public void testGenericGroupFetchOffsetsWithDeadGroup() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group", true);
        group.transitionTo(ClassicGroupState.DEAD);
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1)), new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("bar").setPartitionIndexes(Collections.singletonList(0)));
        List<OffsetFetchResponseData.OffsetFetchResponseTopics> expectedResponse = Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Collections.singletonList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0))));
        Assertions.assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE));
    }

    @Test
    public void testFetchOffsetsWithUnknownGroup() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1)), new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("bar").setPartitionIndexes(Collections.singletonList(0)));
        List<OffsetFetchResponseData.OffsetFetchResponseTopics> expectedResponse = Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Collections.singletonList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0))));
        Assertions.assertEquals(expectedResponse, context.fetchOffsets("group", request, Long.MAX_VALUE));
    }

    @Test
    public void testFetchOffsetsAtDifferentCommittedOffset() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        Assertions.assertEquals((long)0L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "foo", 0, 100L, 1);
        Assertions.assertEquals((long)1L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "foo", 1, 110L, 1);
        Assertions.assertEquals((long)2L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "bar", 0, 200L, 1);
        Assertions.assertEquals((long)3L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "foo", 1, 111L, 2);
        Assertions.assertEquals((long)4L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "bar", 1, 210L, 2);
        Assertions.assertEquals((long)5L, (long)context.lastWrittenOffset);
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1)), new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1)));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1)))), context.fetchOffsets("group", request, 0L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1)))), context.fetchOffsets("group", request, 1L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 110L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(0), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1)))), context.fetchOffsets("group", request, 2L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 110L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1)))), context.fetchOffsets("group", request, 3L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 111L, 2, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1)))), context.fetchOffsets("group", request, 4L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 111L, 2, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 210L, 2, "metadata")))), context.fetchOffsets("group", request, 5L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 111L, 2, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 210L, 2, "metadata")))), context.fetchOffsets("group", request, Long.MAX_VALUE));
    }

    @Test
    public void testFetchOffsetsWithPendingTransactionalOffsets() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        context.commitOffset("group", "foo", 0, 100L, 1);
        context.commitOffset("group", "foo", 1, 110L, 1);
        context.commitOffset("group", "bar", 0, 200L, 1);
        context.commit();
        Assertions.assertEquals((long)3L, (long)context.lastWrittenOffset);
        Assertions.assertEquals((long)3L, (long)context.lastCommittedOffset);
        context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds());
        context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds());
        context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds());
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> request = Arrays.asList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Arrays.asList(0, 1)), new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("bar").setPartitionIndexes(Arrays.asList(0, 1)));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT)))), context.fetchOffsets("group", request, Long.MAX_VALUE));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 110L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"), OffsetMetadataManagerTest.mkInvalidOffsetPartitionResponse(1)))), context.fetchOffsets("group", request, context.lastCommittedOffset));
        context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 111L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 201L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 211L, 1, "metadata")))), context.fetchOffsets("group", request, Long.MAX_VALUE));
    }

    @Test
    public void testGenericGroupFetchAllOffsetsWithDeadGroup() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("group", true);
        group.transitionTo(ClassicGroupState.DEAD);
        Assertions.assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", Long.MAX_VALUE));
    }

    @Test
    public void testFetchAllOffsetsWithUnknownGroup() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        Assertions.assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", Long.MAX_VALUE));
    }

    @Test
    public void testFetchAllOffsetsAtDifferentCommittedOffset() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        Assertions.assertEquals((long)0L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "foo", 0, 100L, 1);
        Assertions.assertEquals((long)1L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "foo", 1, 110L, 1);
        Assertions.assertEquals((long)2L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "bar", 0, 200L, 1);
        Assertions.assertEquals((long)3L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "foo", 1, 111L, 2);
        Assertions.assertEquals((long)4L, (long)context.lastWrittenOffset);
        context.commitOffset("group", "bar", 1, 210L, 2);
        Assertions.assertEquals((long)5L, (long)context.lastWrittenOffset);
        Assertions.assertEquals(Collections.emptyList(), context.fetchAllOffsets("group", 0L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata")))), context.fetchAllOffsets("group", 1L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 110L, 1, "metadata")))), context.fetchAllOffsets("group", 2L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 110L, 1, "metadata")))), context.fetchAllOffsets("group", 3L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 111L, 2, "metadata")))), context.fetchAllOffsets("group", 4L));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 210L, 2, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 111L, 2, "metadata")))), context.fetchAllOffsets("group", Long.MAX_VALUE));
    }

    @Test
    public void testFetchAllOffsetsWithPendingTransactionalOffsets() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        context.commitOffset("group", "foo", 0, 100L, 1);
        context.commitOffset("group", "foo", 1, 110L, 1);
        context.commitOffset("group", "bar", 0, 200L, 1);
        context.commit();
        Assertions.assertEquals((long)3L, (long)context.lastWrittenOffset);
        Assertions.assertEquals((long)3L, (long)context.lastCommittedOffset);
        context.commitOffset(10L, "group", "foo", 1, 111L, 1, context.time.milliseconds());
        context.commitOffset(10L, "group", "bar", 0, 201L, 1, context.time.milliseconds());
        context.commitOffset(10L, "group", "bar", 1, 211L, 1, context.time.milliseconds());
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, Errors.UNSTABLE_OFFSET_COMMIT))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, Errors.UNSTABLE_OFFSET_COMMIT)))), context.fetchAllOffsets("group", Long.MAX_VALUE));
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 200L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 110L, 1, "metadata")))), context.fetchAllOffsets("group", context.lastCommittedOffset));
        context.replayEndTransactionMarker(10L, TransactionResult.COMMIT);
        Assertions.assertEquals(Arrays.asList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("bar").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 201L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 211L, 1, "metadata"))), new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Arrays.asList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata"), OffsetMetadataManagerTest.mkOffsetPartitionResponse(1, 111L, 1, "metadata")))), context.fetchAllOffsets("group", Long.MAX_VALUE));
    }

    @Test
    public void testConsumerGroupOffsetFetchWithMemberIdAndEpoch() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").build());
        context.commitOffset("group", "foo", 0, 100L, 1);
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0)));
        Assertions.assertEquals(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata")))), context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE));
        Assertions.assertEquals(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata")))), context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE));
    }

    @Test
    public void testConsumerGroupOffsetFetchFromAdminClient() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        group.getOrMaybeCreateMember("member", true);
        context.commitOffset("group", "foo", 0, 100L, 1);
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0)));
        Assertions.assertEquals(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata")))), context.fetchOffsets("group", topics, Long.MAX_VALUE));
        Assertions.assertEquals(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(OffsetMetadataManagerTest.mkOffsetPartitionResponse(0, 100L, 1, "metadata")))), context.fetchAllOffsets("group", Long.MAX_VALUE));
    }

    @Test
    public void testConsumerGroupOffsetFetchWithUnknownMemberId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0)));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.fetchOffsets("group", "", 0, topics, Long.MAX_VALUE));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.fetchOffsets("group", "member", 0, topics, Long.MAX_VALUE));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.fetchAllOffsets("group", "", 0, Long.MAX_VALUE));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> context.fetchAllOffsets("group", "member", 0, Long.MAX_VALUE));
    }

    @Test
    public void testConsumerGroupOffsetFetchWithStaleMemberEpoch() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").build());
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0)));
        Assertions.assertThrows(StaleMemberEpochException.class, () -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE));
        Assertions.assertThrows(StaleMemberEpochException.class, () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
    }

    @Test
    public void testConsumerGroupOffsetFetchWithIllegalGenerationId() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("group", true);
        group.updateMember(new ConsumerGroupMember.Builder("member").setClassicMemberMetadata(new ConsumerGroupMemberMetadataValue.ClassicMemberMetadata()).build());
        List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0)));
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.fetchOffsets("group", "member", 10, topics, Long.MAX_VALUE));
        Assertions.assertThrows(IllegalGenerationException.class, () -> context.fetchAllOffsets("group", "member", 10, Long.MAX_VALUE));
    }

    @Test
    public void testGenericGroupOffsetDelete() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        context.commitOffset("foo", "bar", 0, 100L, 0);
        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
        context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar", 0));
    }

    @Test
    public void testGenericGroupOffsetDeleteWithErrors() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.setSubscribedTopics(Optional.of(Collections.singleton("bar")));
        context.commitOffset("foo", "bar", 0, 100L, 0);
        context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
        context.testOffsetDeleteWith("foo", "bar", 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
    }

    @Test
    public void testGenericGroupOffsetDeleteWithPendingTransactionalOffsets() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        context.commitOffset(10L, "foo", "bar", 0, 100L, 0, context.time.milliseconds());
        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
        context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar", 0));
    }

    @Test
    public void testConsumerGroupOffsetDelete() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        context.commitOffset("foo", "bar", 0, 100L, 0);
        Assertions.assertFalse((boolean)group.isSubscribedToTopic("bar"));
        context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
    }

    @Test
    public void testConsumerGroupOffsetDeleteWithErrors() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        MetadataImage image = new MetadataImageBuilder().addTopic(Uuid.randomUuid(), "foo", 1).addRacks().build();
        ConsumerGroupMember member1 = new ConsumerGroupMember.Builder("member1").setSubscribedTopicNames(Collections.singletonList("bar")).build();
        group.computeSubscriptionMetadata(group.computeSubscribedTopicNames(null, member1), image.topics(), image.cluster());
        group.updateMember(member1);
        context.commitOffset("foo", "bar", 0, 100L, 0);
        Assertions.assertTrue((boolean)group.isSubscribedToTopic("bar"));
        context.testOffsetDeleteWith("foo", "bar1", 0, Errors.NONE);
        context.testOffsetDeleteWith("foo", "bar", 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
    }

    @Test
    public void testConsumerGroupOffsetDeleteWithPendingTransactionalOffsets() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ConsumerGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup("foo", true);
        context.commitOffset(10L, "foo", "bar", 0, 100L, 0, context.time.milliseconds());
        Assertions.assertFalse((boolean)group.isSubscribedToTopic("bar"));
        context.testOffsetDeleteWith("foo", "bar", 0, Errors.NONE);
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar", 0));
    }

    @ParameterizedTest
    @EnumSource(value=Group.GroupType.class, names={"CLASSIC", "CONSUMER"})
    public void testDeleteGroupAllOffsets(Group.GroupType groupType) {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.getOrMaybeCreateGroup(groupType, "foo");
        context.commitOffset("foo", "bar-0", 0, 100L, 0);
        context.commitOffset("foo", "bar-0", 1, 100L, 0);
        context.commitOffset("foo", "bar-1", 0, 100L, 0);
        List<CoordinatorRecord> expectedRecords = Arrays.asList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar-1", (int)0), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar-0", (int)0), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar-0", (int)1));
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        int numDeleteOffsets = context.deleteAllOffsets("foo", records);
        Assertions.assertEquals(expectedRecords, records);
        Assertions.assertEquals((int)3, (int)numDeleteOffsets);
    }

    @ParameterizedTest
    @EnumSource(value=Group.GroupType.class, names={"CLASSIC", "CONSUMER"})
    public void testDeleteGroupAllOffsetsWithPendingTransactionalOffsets(Group.GroupType groupType) {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.getOrMaybeCreateGroup(groupType, "foo");
        context.commitOffset("foo", "bar-0", 0, 100L, 0);
        context.commitOffset("foo", "bar-0", 1, 100L, 0);
        context.commitOffset("foo", "bar-1", 0, 100L, 0);
        context.commitOffset(10L, "foo", "bar-1", 0, 101L, 0, context.time.milliseconds());
        context.commitOffset(10L, "foo", "bar-2", 0, 100L, 0, context.time.milliseconds());
        List<CoordinatorRecord> expectedRecords = Arrays.asList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar-1", (int)0), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar-0", (int)0), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar-0", (int)1), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar-2", (int)0));
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        int numDeleteOffsets = context.deleteAllOffsets("foo", records);
        Assertions.assertEquals(expectedRecords, records);
        Assertions.assertEquals((int)4, (int)numDeleteOffsets);
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar-0", 0));
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar-0", 1));
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar-1", 0));
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar-2", 0));
    }

    @Test
    public void testCleanupExpiredOffsetsGroupHasNoOffsets() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        Assertions.assertTrue((boolean)context.cleanupExpiredOffsets("unknown-group-id", records));
        Assertions.assertEquals(Collections.emptyList(), records);
    }

    @Test
    public void testCleanupExpiredOffsetsGroupDoesNotExist() {
        GroupMetadataManager groupMetadataManager = (GroupMetadataManager)Mockito.mock(GroupMetadataManager.class);
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().withGroupMetadataManager(groupMetadataManager).build();
        Mockito.when((Object)groupMetadataManager.group("unknown-group-id")).thenThrow(GroupIdNotFoundException.class);
        context.commitOffset("unknown-group-id", "topic", 0, 100L, 0);
        Assertions.assertThrows(GroupIdNotFoundException.class, () -> context.cleanupExpiredOffsets("unknown-group-id", new ArrayList<CoordinatorRecord>()));
    }

    @Test
    public void testCleanupExpiredOffsetsEmptyOffsetExpirationCondition() {
        GroupMetadataManager groupMetadataManager = (GroupMetadataManager)Mockito.mock(GroupMetadataManager.class);
        Group group = (Group)Mockito.mock(Group.class);
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().withGroupMetadataManager(groupMetadataManager).build();
        context.commitOffset("group-id", "topic", 0, 100L, 0);
        Mockito.when((Object)groupMetadataManager.group("group-id")).thenReturn((Object)group);
        Mockito.when((Object)group.offsetExpirationCondition()).thenReturn(Optional.empty());
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(Collections.emptyList(), records);
    }

    @Test
    public void testCleanupExpiredOffsets() {
        GroupMetadataManager groupMetadataManager = (GroupMetadataManager)Mockito.mock(GroupMetadataManager.class);
        Group group = (Group)Mockito.mock(Group.class);
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().withGroupMetadataManager(groupMetadataManager).withOffsetsRetentionMinutes(1).build();
        long commitTimestamp = context.time.milliseconds();
        context.commitOffset("group-id", "firstTopic", 0, 100L, 0, commitTimestamp);
        context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp);
        context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500L);
        context.time.sleep(Duration.ofMinutes(1L).toMillis());
        List<CoordinatorRecord> expectedRecords = Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"group-id", (String)"secondTopic", (int)0));
        Mockito.when((Object)groupMetadataManager.group("group-id")).thenReturn((Object)group);
        Mockito.when((Object)group.offsetExpirationCondition()).thenReturn(Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
        Mockito.when((Object)group.isSubscribedToTopic("firstTopic")).thenReturn((Object)true);
        Mockito.when((Object)group.isSubscribedToTopic("secondTopic")).thenReturn((Object)false);
        ArrayList<Object> records = new ArrayList<CoordinatorRecord>();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(expectedRecords, records);
        context.time.sleep(500L);
        expectedRecords = Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"group-id", (String)"secondTopic", (int)1));
        records = new ArrayList();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(expectedRecords, records);
        Mockito.when((Object)group.isSubscribedToTopic("firstTopic")).thenReturn((Object)false);
        context.commitOffset("group-id", "firstTopic", 1, 100L, 0, commitTimestamp + 500L);
        context.commitOffset("group-id", "secondTopic", 0, 101L, 0, commitTimestamp + 500L);
        expectedRecords = Arrays.asList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"group-id", (String)"firstTopic", (int)0), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"group-id", (String)"firstTopic", (int)1), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"group-id", (String)"secondTopic", (int)0));
        records = new ArrayList();
        Assertions.assertTrue((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(expectedRecords, records);
    }

    @Test
    public void testCleanupExpiredOffsetsWithPendingTransactionalOffsets() {
        GroupMetadataManager groupMetadataManager = (GroupMetadataManager)Mockito.mock(GroupMetadataManager.class);
        Group group = (Group)Mockito.mock(Group.class);
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().withGroupMetadataManager(groupMetadataManager).withOffsetsRetentionMinutes(1).build();
        long commitTimestamp = context.time.milliseconds();
        context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
        context.commitOffset(10L, "group-id", "foo", 0, 101L, 0, commitTimestamp + 500L);
        context.time.sleep(Duration.ofMinutes(1L).toMillis());
        Mockito.when((Object)groupMetadataManager.group("group-id")).thenReturn((Object)group);
        Mockito.when((Object)group.offsetExpirationCondition()).thenReturn(Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
        Mockito.when((Object)group.isSubscribedToTopic("foo")).thenReturn((Object)false);
        ArrayList<CoordinatorRecord> records = new ArrayList<CoordinatorRecord>();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(Collections.emptyList(), records);
    }

    @Test
    public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly() {
        GroupMetadataManager groupMetadataManager = (GroupMetadataManager)Mockito.mock(GroupMetadataManager.class);
        Group group = (Group)Mockito.mock(Group.class);
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().withGroupMetadataManager(groupMetadataManager).withOffsetsRetentionMinutes(1).build();
        long commitTimestamp = context.time.milliseconds();
        context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
        context.commitOffset(10L, "group-id", "foo", 1, 101L, 0, commitTimestamp + 500L);
        context.time.sleep(Duration.ofMinutes(1L).toMillis());
        Mockito.when((Object)groupMetadataManager.group("group-id")).thenReturn((Object)group);
        Mockito.when((Object)group.offsetExpirationCondition()).thenReturn(Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
        Mockito.when((Object)group.isSubscribedToTopic("foo")).thenReturn((Object)false);
        List<CoordinatorRecord> expectedRecords = List.of(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"group-id", (String)"foo", (int)0));
        ArrayList<Object> records = new ArrayList<CoordinatorRecord>();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(expectedRecords, records);
        records = new ArrayList();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(List.of(), records);
    }

    private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(int partition, long offset, int leaderEpoch, String metadata) {
        return new OffsetFetchResponseData.OffsetFetchResponsePartitions().setPartitionIndex(partition).setCommittedOffset(offset).setCommittedLeaderEpoch(leaderEpoch).setMetadata(metadata);
    }

    private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkInvalidOffsetPartitionResponse(int partition) {
        return new OffsetFetchResponseData.OffsetFetchResponsePartitions().setPartitionIndex(partition).setCommittedOffset(-1L).setCommittedLeaderEpoch(-1).setMetadata("");
    }

    private static OffsetFetchResponseData.OffsetFetchResponsePartitions mkOffsetPartitionResponse(int partition, Errors error) {
        return new OffsetFetchResponseData.OffsetFetchResponsePartitions().setPartitionIndex(partition).setErrorCode(error.code()).setCommittedOffset(-1L).setCommittedLeaderEpoch(-1).setMetadata("");
    }

    @Test
    public void testReplay() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        this.verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(0L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(1L, 200L, OptionalInt.of(10), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(2L, 200L, OptionalInt.of(10), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyReplay(context, "foo", "bar", 1, new OffsetAndMetadata(3L, 300L, OptionalInt.of(10), "small", context.time.milliseconds(), OptionalLong.of(12345L)));
    }

    @Test
    public void testTransactionalReplay() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        this.verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata(0L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 5L, "foo", "bar", 1, new OffsetAndMetadata(1L, 101L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 5L, "bar", "zar", 0, new OffsetAndMetadata(2L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 5L, "bar", "zar", 1, new OffsetAndMetadata(3L, 101L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 6L, "foo", "bar", 2, new OffsetAndMetadata(4L, 102L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 6L, "foo", "bar", 3, new OffsetAndMetadata(5L, 102L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
    }

    @Test
    public void testReplayWithTombstoneAndPendingTransactionalOffsets() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        this.verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(0L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 10L, "foo", "bar", 0, new OffsetAndMetadata(1L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 10L, "foo", "bar", 1, new OffsetAndMetadata(2L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        context.replay(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar", (int)0));
        context.replay(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"foo", (String)"bar", (int)1));
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar", 0));
        Assertions.assertFalse((boolean)context.hasOffset("foo", "bar", 1));
    }

    @Test
    public void testReplayTransactionEndMarkerWithCommit() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        this.verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(0L, 99L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata(1L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new OffsetAndMetadata(2L, 200L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        context.replayEndTransactionMarker(1L, TransactionResult.COMMIT);
        context.replayEndTransactionMarker(5L, TransactionResult.COMMIT);
        Assertions.assertNull((Object)context.offsetMetadataManager.pendingTransactionalOffset(5L, "foo", "bar", 0));
        Assertions.assertEquals((Object)new OffsetAndMetadata(1L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()), (Object)context.offsetMetadataManager.offset("foo", "bar", 0));
        context.replayEndTransactionMarker(6L, TransactionResult.ABORT);
        Assertions.assertNull((Object)context.offsetMetadataManager.pendingTransactionalOffset(6L, "foo", "bar", 1));
        Assertions.assertNull((Object)context.offsetMetadataManager.offset("foo", "bar", 1));
    }

    @Test
    public void testReplayTransactionEndMarkerKeepsTheMostRecentCommittedOffset() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        this.verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata(0L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyReplay(context, "foo", "bar", 0, new OffsetAndMetadata(1L, 101L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        context.replayEndTransactionMarker(5L, TransactionResult.COMMIT);
        Assertions.assertNull((Object)context.offsetMetadataManager.pendingTransactionalOffset(5L, "foo", "bar", 0));
        Assertions.assertEquals((Object)new OffsetAndMetadata(1L, 101L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()), (Object)context.offsetMetadataManager.offset("foo", "bar", 0));
    }

    @Test
    public void testOffsetCommitsNumberMetricWithTransactionalOffsets() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        this.verifyTransactionalReplay(context, 4L, "foo", "bar", 0, new OffsetAndMetadata(0L, 100L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 5L, "foo", "bar", 0, new OffsetAndMetadata(1L, 101L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        this.verifyTransactionalReplay(context, 6L, "foo", "bar", 1, new OffsetAndMetadata(2L, 200L, OptionalInt.empty(), "small", context.time.milliseconds(), OptionalLong.empty()));
        context.replayEndTransactionMarker(4L, TransactionResult.COMMIT);
        context.replayEndTransactionMarker(5L, TransactionResult.COMMIT);
        context.replayEndTransactionMarker(6L, TransactionResult.COMMIT);
        ((GroupCoordinatorMetricsShard)Mockito.verify((Object)context.metrics, (VerificationMode)Mockito.times((int)2))).incrementNumOffsets();
    }

    @Test
    public void testOffsetCommitsSensor() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        group.add(this.mkGenericMember("member", Optional.of("new-instance-id")));
        group.transitionTo(ClassicGroupState.PREPARING_REBALANCE);
        group.initNextGeneration();
        Assertions.assertEquals((int)1, (int)group.generationId());
        group.transitionTo(ClassicGroupState.STABLE);
        CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(1).setRetentionTimeMs(1234L).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Arrays.asList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L), new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(1).setCommittedOffset(150L))))));
        ((GroupCoordinatorMetricsShard)Mockito.verify((Object)context.metrics)).record("OffsetCommits", 2.0);
    }

    @Test
    public void testOffsetsExpiredSensor() {
        GroupMetadataManager groupMetadataManager = (GroupMetadataManager)Mockito.mock(GroupMetadataManager.class);
        Group group = (Group)Mockito.mock(Group.class);
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().withGroupMetadataManager(groupMetadataManager).withOffsetsRetentionMinutes(1).build();
        long commitTimestamp = context.time.milliseconds();
        context.commitOffset("group-id", "firstTopic", 0, 100L, 0, commitTimestamp);
        context.commitOffset("group-id", "secondTopic", 0, 100L, 0, commitTimestamp);
        context.commitOffset("group-id", "secondTopic", 1, 100L, 0, commitTimestamp + 500L);
        context.time.sleep(Duration.ofMinutes(1L).toMillis());
        List<CoordinatorRecord> expectedRecords = Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"group-id", (String)"secondTopic", (int)0));
        Mockito.when((Object)groupMetadataManager.group("group-id")).thenReturn((Object)group);
        Mockito.when((Object)group.offsetExpirationCondition()).thenReturn(Optional.of(new OffsetExpirationConditionImpl(offsetAndMetadata -> offsetAndMetadata.commitTimestampMs)));
        Mockito.when((Object)group.isSubscribedToTopic("firstTopic")).thenReturn((Object)true);
        Mockito.when((Object)group.isSubscribedToTopic("secondTopic")).thenReturn((Object)false);
        ArrayList<Object> records = new ArrayList<CoordinatorRecord>();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        Assertions.assertEquals(expectedRecords, records);
        context.time.sleep(500L);
        records = new ArrayList();
        Assertions.assertFalse((boolean)context.cleanupExpiredOffsets("group-id", records));
        ((GroupCoordinatorMetricsShard)Mockito.verify((Object)context.metrics, (VerificationMode)Mockito.times((int)2))).record("OffsetExpired", 1.0);
        Mockito.when((Object)group.isSubscribedToTopic("firstTopic")).thenReturn((Object)false);
        context.commitOffset("group-id", "firstTopic", 1, 100L, 0, commitTimestamp + 500L);
        context.commitOffset("group-id", "secondTopic", 0, 101L, 0, commitTimestamp + 500L);
        records = new ArrayList();
        Assertions.assertTrue((boolean)context.cleanupExpiredOffsets("group-id", records));
        ((GroupCoordinatorMetricsShard)Mockito.verify((Object)context.metrics)).record("OffsetExpired", 3.0);
    }

    @Test
    public void testOffsetDeletionsSensor() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        ClassicGroup group = context.groupMetadataManager.getOrMaybeCreateClassicGroup("foo", true);
        context.commitOffset("foo", "bar", 0, 100L, 0);
        context.commitOffset("foo", "bar", 1, 150L, 0);
        group.setSubscribedTopics(Optional.of(Collections.emptySet()));
        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName("bar").setPartitions(Arrays.asList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0), new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(1)))).iterator());
        context.deleteOffsets(new OffsetDeleteRequestData().setGroupId("foo").setTopics(requestTopicCollection));
        ((GroupCoordinatorMetricsShard)Mockito.verify((Object)context.metrics)).record("OffsetDeletions", 2.0);
    }

    @Test
    public void testOnPartitionsDeleted() {
        OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
        context.commitOffset("grp-0", "foo", 1, 100L, 1, context.time.milliseconds());
        context.commitOffset("grp-0", "foo", 2, 200L, 1, context.time.milliseconds());
        context.commitOffset("grp-0", "foo", 3, 300L, 1, context.time.milliseconds());
        context.commitOffset("grp-1", "bar", 1, 100L, 1, context.time.milliseconds());
        context.commitOffset("grp-1", "bar", 2, 200L, 1, context.time.milliseconds());
        context.commitOffset("grp-1", "bar", 3, 300L, 1, context.time.milliseconds());
        context.commitOffset(100L, "grp-2", "foo", 1, 100L, 1, context.time.milliseconds());
        context.commitOffset(100L, "grp-2", "foo", 2, 200L, 1, context.time.milliseconds());
        context.commitOffset(100L, "grp-2", "foo", 3, 300L, 1, context.time.milliseconds());
        List<CoordinatorRecord> records = context.deletePartitions(Arrays.asList(new TopicPartition("foo", 1), new TopicPartition("foo", 2), new TopicPartition("foo", 3), new TopicPartition("bar", 1)));
        List<CoordinatorRecord> expectedRecords = Arrays.asList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"grp-0", (String)"foo", (int)1), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"grp-0", (String)"foo", (int)2), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"grp-0", (String)"foo", (int)3), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"grp-1", (String)"bar", (int)1), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"grp-2", (String)"foo", (int)1), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"grp-2", (String)"foo", (int)2), GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)"grp-2", (String)"foo", (int)3));
        Assertions.assertEquals(new HashSet<CoordinatorRecord>(expectedRecords), new HashSet<CoordinatorRecord>(records));
        Assertions.assertFalse((boolean)context.hasOffset("grp-0", "foo", 1));
        Assertions.assertFalse((boolean)context.hasOffset("grp-0", "foo", 2));
        Assertions.assertFalse((boolean)context.hasOffset("grp-0", "foo", 3));
        Assertions.assertFalse((boolean)context.hasOffset("grp-1", "bar", 1));
        Assertions.assertFalse((boolean)context.hasOffset("grp-2", "foo", 1));
        Assertions.assertFalse((boolean)context.hasOffset("grp-2", "foo", 2));
        Assertions.assertFalse((boolean)context.hasOffset("grp-2", "foo", 3));
    }

    private void verifyReplay(OffsetMetadataManagerTestContext context, String groupId, String topic, int partition, OffsetAndMetadata offsetAndMetadata) {
        context.replay(GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)groupId, (String)topic, (int)partition, (OffsetAndMetadata)offsetAndMetadata));
        Assertions.assertEquals((Object)offsetAndMetadata, (Object)context.offsetMetadataManager.offset(groupId, topic, partition));
    }

    private void verifyTransactionalReplay(OffsetMetadataManagerTestContext context, long producerId, String groupId, String topic, int partition, OffsetAndMetadata offsetAndMetadata) {
        context.replay(producerId, GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)groupId, (String)topic, (int)partition, (OffsetAndMetadata)offsetAndMetadata));
        Assertions.assertEquals((Object)offsetAndMetadata, (Object)context.offsetMetadataManager.pendingTransactionalOffset(producerId, groupId, topic, partition));
    }

    private ClassicGroupMember mkGenericMember(String memberId, Optional<String> groupInstanceId) {
        return new ClassicGroupMember(memberId, groupInstanceId, "client-id", "host", 5000, 5000, "consumer", new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol().setName("range").setMetadata(new byte[0])).iterator()));
    }

    static class OffsetMetadataManagerTestContext {
        final MockTime time;
        final MockCoordinatorTimer<Void, CoordinatorRecord> timer;
        final SnapshotRegistry snapshotRegistry;
        final GroupCoordinatorMetricsShard metrics;
        final GroupMetadataManager groupMetadataManager;
        final OffsetMetadataManager offsetMetadataManager;
        long lastCommittedOffset = 0L;
        long lastWrittenOffset = 0L;

        OffsetMetadataManagerTestContext(MockTime time, MockCoordinatorTimer<Void, CoordinatorRecord> timer, SnapshotRegistry snapshotRegistry, GroupCoordinatorMetricsShard metrics, GroupMetadataManager groupMetadataManager, OffsetMetadataManager offsetMetadataManager) {
            this.time = time;
            this.timer = timer;
            this.snapshotRegistry = snapshotRegistry;
            this.metrics = metrics;
            this.groupMetadataManager = groupMetadataManager;
            this.offsetMetadataManager = offsetMetadataManager;
        }

        public Group getOrMaybeCreateGroup(Group.GroupType groupType, String groupId) {
            switch (groupType) {
                case CLASSIC: {
                    return this.groupMetadataManager.getOrMaybeCreateClassicGroup(groupId, true);
                }
                case CONSUMER: {
                    return this.groupMetadataManager.getOrMaybeCreatePersistedConsumerGroup(groupId, true);
                }
            }
            throw new IllegalArgumentException("Invalid group type: " + String.valueOf(groupType));
        }

        public void commit() {
            long lastCommittedOffset = this.lastCommittedOffset;
            this.lastCommittedOffset = this.lastWrittenOffset;
            this.snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
        }

        public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffset(OffsetCommitRequestData request) {
            return this.commitOffset(ApiKeys.OFFSET_COMMIT.latestVersion(), request);
        }

        public CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> commitOffset(short version, OffsetCommitRequestData request) {
            RequestContext context = new RequestContext(new RequestHeader(ApiKeys.OFFSET_COMMIT, version, "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
            CoordinatorResult result = this.offsetMetadataManager.commitOffset(context, request);
            result.records().forEach(this::replay);
            return result;
        }

        public CoordinatorResult<TxnOffsetCommitResponseData, CoordinatorRecord> commitTransactionalOffset(TxnOffsetCommitRequestData request) {
            RequestContext context = new RequestContext(new RequestHeader(ApiKeys.TXN_OFFSET_COMMIT, ApiKeys.TXN_OFFSET_COMMIT.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false);
            CoordinatorResult result = this.offsetMetadataManager.commitTransactionalOffset(context, request);
            result.records().forEach(record -> this.replay(request.producerId(), (CoordinatorRecord)record));
            return result;
        }

        public List<CoordinatorRecord> deletePartitions(List<TopicPartition> topicPartitions) {
            List records = this.offsetMetadataManager.onPartitionsDeleted(topicPartitions);
            records.forEach(this::replay);
            return records;
        }

        public CoordinatorResult<OffsetDeleteResponseData, CoordinatorRecord> deleteOffsets(OffsetDeleteRequestData request) {
            CoordinatorResult result = this.offsetMetadataManager.deleteOffsets(request);
            result.records().forEach(this::replay);
            return result;
        }

        public int deleteAllOffsets(String groupId, List<CoordinatorRecord> records) {
            ArrayList addedRecords = new ArrayList();
            int numDeletedOffsets = this.offsetMetadataManager.deleteAllOffsets(groupId, addedRecords);
            addedRecords.forEach(this::replay);
            records.addAll(addedRecords);
            return numDeletedOffsets;
        }

        public boolean cleanupExpiredOffsets(String groupId, List<CoordinatorRecord> records) {
            ArrayList addedRecords = new ArrayList();
            boolean isOffsetsEmptyForGroup = this.offsetMetadataManager.cleanupExpiredOffsets(groupId, addedRecords);
            addedRecords.forEach(this::replay);
            records.addAll(addedRecords);
            return isOffsetsEmptyForGroup;
        }

        public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(String groupId, List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics, long committedOffset) {
            return this.fetchOffsets(groupId, null, -1, topics, committedOffset);
        }

        public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchOffsets(String groupId, String memberId, int memberEpoch, List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics, long committedOffset) {
            OffsetFetchResponseData.OffsetFetchResponseGroup response = this.offsetMetadataManager.fetchOffsets(new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId).setMemberId(memberId).setMemberEpoch(memberEpoch).setTopics(topics), committedOffset);
            Assertions.assertEquals((Object)groupId, (Object)response.groupId());
            return response.topics();
        }

        public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(String groupId, long committedOffset) {
            return this.fetchAllOffsets(groupId, null, -1, committedOffset);
        }

        public List<OffsetFetchResponseData.OffsetFetchResponseTopics> fetchAllOffsets(String groupId, String memberId, int memberEpoch, long committedOffset) {
            OffsetFetchResponseData.OffsetFetchResponseGroup response = this.offsetMetadataManager.fetchAllOffsets(new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId(groupId).setMemberId(memberId).setMemberEpoch(memberEpoch), committedOffset);
            Assertions.assertEquals((Object)groupId, (Object)response.groupId());
            return response.topics();
        }

        public List<MockCoordinatorTimer.ExpiredTimeout<Void, CoordinatorRecord>> sleep(long ms) {
            this.time.sleep(ms);
            List timeouts = this.timer.poll();
            timeouts.forEach(timeout -> {
                if (timeout.result.replayRecords()) {
                    timeout.result.records().forEach(this::replay);
                }
            });
            return timeouts;
        }

        public void commitOffset(String groupId, String topic, int partition, long offset, int leaderEpoch) {
            this.commitOffset(groupId, topic, partition, offset, leaderEpoch, this.time.milliseconds());
        }

        public void commitOffset(String groupId, String topic, int partition, long offset, int leaderEpoch, long commitTimestamp) {
            this.commitOffset(-1L, groupId, topic, partition, offset, leaderEpoch, commitTimestamp);
        }

        public void commitOffset(long producerId, String groupId, String topic, int partition, long offset, int leaderEpoch, long commitTimestamp) {
            this.replay(producerId, GroupCoordinatorRecordHelpers.newOffsetCommitRecord((String)groupId, (String)topic, (int)partition, (OffsetAndMetadata)new OffsetAndMetadata(offset, OptionalInt.of(leaderEpoch), "metadata", commitTimestamp, OptionalLong.empty())));
        }

        public void deleteOffset(String groupId, String topic, int partition) {
            this.replay(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)groupId, (String)topic, (int)partition));
        }

        private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
            if (apiMessageAndVersion == null) {
                return null;
            }
            return apiMessageAndVersion.message();
        }

        private void replay(CoordinatorRecord record) {
            this.replay(-1L, record);
        }

        private void replay(long producerId, CoordinatorRecord record) {
            this.snapshotRegistry.idempotentCreateSnapshot(this.lastWrittenOffset);
            ApiMessageAndVersion key = record.key();
            ApiMessageAndVersion value = record.value();
            if (key == null) {
                throw new IllegalStateException("Received a null key in " + String.valueOf(record));
            }
            switch (key.version()) {
                case 1: {
                    this.offsetMetadataManager.replay(this.lastWrittenOffset, producerId, (OffsetCommitKey)key.message(), (OffsetCommitValue)this.messageOrNull(value));
                    break;
                }
                default: {
                    throw new IllegalStateException("Received an unknown record type " + key.version() + " in " + String.valueOf(record));
                }
            }
            ++this.lastWrittenOffset;
        }

        private void replayEndTransactionMarker(long producerId, TransactionResult result) {
            this.snapshotRegistry.idempotentCreateSnapshot(this.lastWrittenOffset);
            this.offsetMetadataManager.replayEndTransactionMarker(producerId, result);
            ++this.lastWrittenOffset;
        }

        public void testOffsetDeleteWith(String groupId, String topic, int partition, Errors expectedError) {
            OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic).setPartitions(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(partition)))).iterator());
            OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection expectedResponsePartitionCollection = new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection();
            expectedResponsePartitionCollection.add((ImplicitLinkedHashCollection.Element)new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(partition).setErrorCode(expectedError.code()));
            OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection expectedResponseTopicCollection = new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setName(topic).setPartitions(expectedResponsePartitionCollection)).iterator());
            List<Object> expectedRecords = Collections.emptyList();
            if (this.hasOffset(groupId, topic, partition) && expectedError == Errors.NONE) {
                expectedRecords = Collections.singletonList(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord((String)groupId, (String)topic, (int)partition));
            }
            CoordinatorResult<OffsetDeleteResponseData, CoordinatorRecord> coordinatorResult = this.deleteOffsets(new OffsetDeleteRequestData().setGroupId(groupId).setTopics(requestTopicCollection));
            Assertions.assertEquals((Object)new OffsetDeleteResponseData().setTopics(expectedResponseTopicCollection), (Object)coordinatorResult.response());
            Assertions.assertEquals(expectedRecords, (Object)coordinatorResult.records());
        }

        public boolean hasOffset(String groupId, String topic, int partition) {
            return this.offsetMetadataManager.hasCommittedOffset(groupId, topic, partition) || this.offsetMetadataManager.hasPendingTransactionalOffsets(groupId, topic, partition);
        }

        public static class Builder {
            private final MockTime time = new MockTime();
            private final MockCoordinatorTimer<Void, CoordinatorRecord> timer = new MockCoordinatorTimer((Time)this.time);
            private final MockCoordinatorExecutor<CoordinatorRecord> executor = new MockCoordinatorExecutor();
            private final LogContext logContext = new LogContext();
            private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
            private final GroupCoordinatorMetricsShard metrics = (GroupCoordinatorMetricsShard)Mockito.mock(GroupCoordinatorMetricsShard.class);
            private final GroupConfigManager configManager = (GroupConfigManager)Mockito.mock(GroupConfigManager.class);
            private GroupMetadataManager groupMetadataManager = null;
            private MetadataImage metadataImage = null;
            private GroupCoordinatorConfig config = null;

            Builder withOffsetMetadataMaxSize(int offsetMetadataMaxSize) {
                this.config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(offsetMetadataMaxSize, 60000L, 1440);
                return this;
            }

            Builder withOffsetsRetentionMinutes(int offsetsRetentionMinutes) {
                this.config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, offsetsRetentionMinutes);
                return this;
            }

            Builder withGroupMetadataManager(GroupMetadataManager groupMetadataManager) {
                this.groupMetadataManager = groupMetadataManager;
                return this;
            }

            OffsetMetadataManagerTestContext build() {
                if (this.metadataImage == null) {
                    this.metadataImage = MetadataImage.EMPTY;
                }
                if (this.config == null) {
                    this.config = GroupCoordinatorConfigTest.createGroupCoordinatorConfig(4096, 60000L, 24);
                }
                if (this.groupMetadataManager == null) {
                    this.groupMetadataManager = new GroupMetadataManager.Builder().withTime((Time)this.time).withTimer(this.timer).withExecutor(this.executor).withSnapshotRegistry(this.snapshotRegistry).withLogContext(this.logContext).withMetadataImage(this.metadataImage).withGroupCoordinatorMetricsShard(this.metrics).withGroupConfigManager(this.configManager).withConfig(GroupCoordinatorConfig.fromProps(Collections.emptyMap())).build();
                }
                OffsetMetadataManager offsetMetadataManager = new OffsetMetadataManager.Builder().withTime((Time)this.time).withLogContext(this.logContext).withSnapshotRegistry(this.snapshotRegistry).withMetadataImage(this.metadataImage).withGroupMetadataManager(this.groupMetadataManager).withGroupCoordinatorConfig(this.config).withGroupCoordinatorMetricsShard(this.metrics).build();
                return new OffsetMetadataManagerTestContext(this.time, this.timer, this.snapshotRegistry, this.metrics, this.groupMetadataManager, offsetMetadataManager);
            }
        }
    }
}

