package org.apache.kafka.coordinator.group;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.OptionalLong;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.IllegalGenerationException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.annotation.ApiKeyVersionsSource;
import org.apache.kafka.coordinator.group.GroupMetadataManager;
import org.apache.kafka.coordinator.group.MockCoordinatorTimer;
import org.apache.kafka.coordinator.group.OffsetMetadataManager;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.consumer.ConsumerGroupMember;
import org.apache.kafka.coordinator.group.generated.OffsetCommitKey;
import org.apache.kafka.coordinator.group.generated.OffsetCommitValue;
import org.apache.kafka.coordinator.group.generic.GenericGroup;
import org.apache.kafka.coordinator.group.generic.GenericGroupMember;
import org.apache.kafka.coordinator.group.generic.GenericGroupState;
import org.apache.kafka.coordinator.group.runtime.CoordinatorResult;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;

/* loaded from: input_file:org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.class */
public class OffsetMetadataManagerTest {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/kafka/coordinator/group/OffsetMetadataManagerTest$OffsetMetadataManagerTestContext.class */
    public static class OffsetMetadataManagerTestContext {
        final MockTime time;
        final MockCoordinatorTimer<Void, Record> timer;
        final SnapshotRegistry snapshotRegistry;
        final GroupMetadataManager groupMetadataManager;
        final OffsetMetadataManager offsetMetadataManager;
        long lastCommittedOffset = 0;
        long lastWrittenOffset = 0;

        /* loaded from: input_file:org/apache/kafka/coordinator/group/OffsetMetadataManagerTest$OffsetMetadataManagerTestContext$Builder.class */
        public static class Builder {
            private final MockTime time = new MockTime();
            private final MockCoordinatorTimer<Void, Record> timer = new MockCoordinatorTimer<>(this.time);
            private final LogContext logContext = new LogContext();
            private final SnapshotRegistry snapshotRegistry = new SnapshotRegistry(this.logContext);
            private MetadataImage metadataImage = null;
            private int offsetMetadataMaxSize = 4096;

            Builder withOffsetMetadataMaxSize(int i) {
                this.offsetMetadataMaxSize = i;
                return this;
            }

            OffsetMetadataManagerTestContext build() {
                if (this.metadataImage == null) {
                    this.metadataImage = MetadataImage.EMPTY;
                }
                GroupMetadataManager build = new GroupMetadataManager.Builder().withTime(this.time).withTimer(this.timer).withSnapshotRegistry(this.snapshotRegistry).withLogContext(this.logContext).withMetadataImage(this.metadataImage).withConsumerGroupAssignors(Collections.singletonList(new RangeAssignor())).build();
                return new OffsetMetadataManagerTestContext(this.time, this.timer, this.snapshotRegistry, build, new OffsetMetadataManager.Builder().withTime(this.time).withLogContext(this.logContext).withSnapshotRegistry(this.snapshotRegistry).withMetadataImage(this.metadataImage).withGroupMetadataManager(build).withOffsetMetadataMaxSize(this.offsetMetadataMaxSize).build());
            }
        }

        OffsetMetadataManagerTestContext(MockTime mockTime, MockCoordinatorTimer<Void, Record> mockCoordinatorTimer, SnapshotRegistry snapshotRegistry, GroupMetadataManager groupMetadataManager, OffsetMetadataManager offsetMetadataManager) {
            this.time = mockTime;
            this.timer = mockCoordinatorTimer;
            this.snapshotRegistry = snapshotRegistry;
            this.groupMetadataManager = groupMetadataManager;
            this.offsetMetadataManager = offsetMetadataManager;
        }

        public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset(OffsetCommitRequestData offsetCommitRequestData) {
            return commitOffset(ApiKeys.OFFSET_COMMIT.latestVersion(), offsetCommitRequestData);
        }

        public CoordinatorResult<OffsetCommitResponseData, Record> commitOffset(short s, OffsetCommitRequestData offsetCommitRequestData) {
            this.snapshotRegistry.getOrCreateSnapshot(this.lastCommittedOffset);
            CoordinatorResult<OffsetCommitResponseData, Record> commitOffset = this.offsetMetadataManager.commitOffset(new RequestContext(new RequestHeader(ApiKeys.OFFSET_COMMIT, s, "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false), offsetCommitRequestData);
            commitOffset.records().forEach(this::replay);
            return commitOffset;
        }

        public List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> sleep(long j) {
            this.time.sleep(j);
            List<MockCoordinatorTimer.ExpiredTimeout<Void, Record>> poll = this.timer.poll();
            poll.forEach(expiredTimeout -> {
                if (expiredTimeout.result.replayRecords()) {
                    expiredTimeout.result.records().forEach(this::replay);
                }
            });
            return poll;
        }

        private ApiMessage messageOrNull(ApiMessageAndVersion apiMessageAndVersion) {
            if (apiMessageAndVersion == null) {
                return null;
            }
            return apiMessageAndVersion.message();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void replay(Record record) {
            ApiMessageAndVersion key = record.key();
            ApiMessageAndVersion value = record.value();
            if (key == null) {
                throw new IllegalStateException("Received a null key in " + record);
            }
            switch (key.version()) {
                case OffsetCommitKey.HIGHEST_SUPPORTED_VERSION /* 1 */:
                    this.offsetMetadataManager.replay((OffsetCommitKey) key.message(), (OffsetCommitValue) messageOrNull(value));
                    this.lastWrittenOffset++;
                    return;
                default:
                    throw new IllegalStateException("Received an unknown record type " + ((int) key.version()) + " in " + record);
            }
        }
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
    @ParameterizedTest
    public void testOffsetCommitWithUnknownGroup(short s) {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        Assertions.assertThrows(s >= 9 ? GroupIdNotFoundException.class : IllegalGenerationException.class, () -> {
            build.commitOffset(s, new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWithDeadGroup() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true).transitionTo(GenericGroupState.DEAD);
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWithUnknownMemberId() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true);
        Assertions.assertThrows(UnknownMemberIdException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWithIllegalGeneration() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        GenericGroup orMaybeCreateGenericGroup = build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true);
        orMaybeCreateGenericGroup.add(mkGenericMember("member", Optional.of("new-instance-id")));
        orMaybeCreateGenericGroup.transitionTo(GenericGroupState.PREPARING_REBALANCE);
        orMaybeCreateGenericGroup.initNextGeneration();
        Assertions.assertEquals(1, orMaybeCreateGenericGroup.generationId());
        Assertions.assertThrows(IllegalGenerationException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWithUnknownInstanceId() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true).add(mkGenericMember("member", Optional.empty()));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGroupInstanceId("instanceid").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWithFencedInstanceId() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true).add(mkGenericMember("member", Optional.of("new-instance-id")));
        Assertions.assertThrows(UnknownMemberIdException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGroupInstanceId("old-instance-id").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWhileInCompletingRebalanceState() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        GenericGroup orMaybeCreateGenericGroup = build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true);
        orMaybeCreateGenericGroup.add(mkGenericMember("member", Optional.of("new-instance-id")));
        orMaybeCreateGenericGroup.transitionTo(GenericGroupState.PREPARING_REBALANCE);
        orMaybeCreateGenericGroup.initNextGeneration();
        Assertions.assertEquals(1, orMaybeCreateGenericGroup.generationId());
        Assertions.assertThrows(RebalanceInProgressException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(1).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWithoutMemberIdAndGeneration() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        GenericGroup orMaybeCreateGenericGroup = build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true);
        orMaybeCreateGenericGroup.add(mkGenericMember("member", Optional.of("new-instance-id")));
        orMaybeCreateGenericGroup.transitionTo(GenericGroupState.PREPARING_REBALANCE);
        orMaybeCreateGenericGroup.initNextGeneration();
        Assertions.assertEquals(1, orMaybeCreateGenericGroup.generationId());
        Assertions.assertThrows(UnknownMemberIdException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testGenericGroupOffsetCommitWithRetentionTime() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        GenericGroup orMaybeCreateGenericGroup = build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true);
        orMaybeCreateGenericGroup.add(mkGenericMember("member", Optional.of("new-instance-id")));
        orMaybeCreateGenericGroup.transitionTo(GenericGroupState.PREPARING_REBALANCE);
        orMaybeCreateGenericGroup.initNextGeneration();
        Assertions.assertEquals(1, orMaybeCreateGenericGroup.generationId());
        orMaybeCreateGenericGroup.transitionTo(GenericGroupState.STABLE);
        CoordinatorResult<OffsetCommitResponseData, Record> commitOffset = build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(1).setRetentionTimeMs(1234L).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals(new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), commitOffset.response());
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newOffsetCommitRecord("foo", "bar", 0, new OffsetAndMetadata(100L, OptionalInt.empty(), "", build.time.milliseconds(), OptionalLong.of(build.time.milliseconds() + 1234)), MetadataImage.EMPTY.features().metadataVersion())), commitOffset.records());
    }

    @Test
    public void testGenericGroupOffsetCommitMaintainsSession() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        GenericGroup orMaybeCreateGenericGroup = build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", true);
        GenericGroupMember mkGenericMember = mkGenericMember("member", Optional.empty());
        orMaybeCreateGenericGroup.add(mkGenericMember);
        orMaybeCreateGenericGroup.transitionTo(GenericGroupState.PREPARING_REBALANCE);
        orMaybeCreateGenericGroup.initNextGeneration();
        Assertions.assertEquals(1, orMaybeCreateGenericGroup.generationId());
        orMaybeCreateGenericGroup.transitionTo(GenericGroupState.STABLE);
        build.groupMetadataManager.rescheduleGenericGroupMemberHeartbeat(orMaybeCreateGenericGroup, mkGenericMember);
        Assertions.assertEquals(Collections.emptyList(), build.sleep(2500L));
        build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(1).setRetentionTimeMs(1234L).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals(Collections.emptyList(), build.sleep(2500L));
        Assertions.assertEquals(1, build.sleep(2500L).size());
        Assertions.assertFalse(orMaybeCreateGenericGroup.hasMemberId(mkGenericMember.memberId()));
    }

    @Test
    public void testSimpleGroupOffsetCommit() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        CoordinatorResult<OffsetCommitResponseData, Record> commitOffset = build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals(new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), commitOffset.response());
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newOffsetCommitRecord("foo", "bar", 0, new OffsetAndMetadata(100L, OptionalInt.empty(), "", build.time.milliseconds(), OptionalLong.empty()), MetadataImage.EMPTY.features().metadataVersion())), commitOffset.records());
        GenericGroup orMaybeCreateGenericGroup = build.groupMetadataManager.getOrMaybeCreateGenericGroup("foo", false);
        Assertions.assertNotNull(orMaybeCreateGenericGroup);
        Assertions.assertEquals("foo", orMaybeCreateGenericGroup.groupId());
    }

    @Test
    public void testSimpleGroupOffsetCommitWithInstanceId() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        CoordinatorResult<OffsetCommitResponseData, Record> commitOffset = build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setGroupInstanceId("instance-id").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals(new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), commitOffset.response());
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newOffsetCommitRecord("foo", "bar", 0, new OffsetAndMetadata(100L, OptionalInt.empty(), "", build.time.milliseconds(), OptionalLong.empty()), MetadataImage.EMPTY.features().metadataVersion())), commitOffset.records());
    }

    @Test
    public void testConsumerGroupOffsetCommitWithUnknownMemberId() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateConsumerGroup("foo", true);
        Assertions.assertThrows(UnknownMemberIdException.class, () -> {
            build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        });
    }

    @Test
    public void testConsumerGroupOffsetCommitWithStaleMemberEpoch() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateConsumerGroup("foo", true).updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setTargetMemberEpoch(10).setPreviousMemberEpoch(10).build());
        OffsetCommitRequestData topics = new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(9).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        Assertions.assertThrows(StaleMemberEpochException.class, () -> {
            build.commitOffset(topics);
        });
        topics.setGenerationIdOrMemberEpoch(11);
        Assertions.assertThrows(StaleMemberEpochException.class, () -> {
            build.commitOffset(topics);
        });
    }

    @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_COMMIT)
    @ParameterizedTest
    public void testConsumerGroupOffsetCommitWithVersionSmallerThanVersion9(short s) {
        if (s < 9 && s != 0) {
            OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
            build.groupMetadataManager.getOrMaybeCreateConsumerGroup("foo", true).updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setTargetMemberEpoch(10).setPreviousMemberEpoch(10).build());
            Assertions.assertThrows(UnsupportedVersionException.class, () -> {
                build.commitOffset(s, new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(9).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
            });
        }
    }

    @Test
    public void testConsumerGroupOffsetCommitFromAdminClient() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateConsumerGroup("foo", true);
        CoordinatorResult<OffsetCommitResponseData, Record> commitOffset = build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))));
        Assertions.assertEquals(new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), commitOffset.response());
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newOffsetCommitRecord("foo", "bar", 0, new OffsetAndMetadata(100L, OptionalInt.empty(), "", build.time.milliseconds(), OptionalLong.empty()), MetadataImage.EMPTY.features().metadataVersion())), commitOffset.records());
    }

    @Test
    public void testConsumerGroupOffsetCommit() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        build.groupMetadataManager.getOrMaybeCreateConsumerGroup("foo", true).updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setTargetMemberEpoch(10).setPreviousMemberEpoch(10).build());
        CoordinatorResult<OffsetCommitResponseData, Record> commitOffset = build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("metadata").setCommitTimestamp(build.time.milliseconds()))))));
        Assertions.assertEquals(new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code()))))), commitOffset.response());
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newOffsetCommitRecord("foo", "bar", 0, new OffsetAndMetadata(100L, OptionalInt.of(10), "metadata", build.time.milliseconds(), OptionalLong.empty()), MetadataImage.EMPTY.features().metadataVersion())), commitOffset.records());
    }

    @Test
    public void testConsumerGroupOffsetCommitWithOffsetMetadataTooLarge() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().withOffsetMetadataMaxSize(5).build();
        build.groupMetadataManager.getOrMaybeCreateConsumerGroup("foo", true).updateMember(new ConsumerGroupMember.Builder("member").setMemberEpoch(10).setTargetMemberEpoch(10).setPreviousMemberEpoch(10).build());
        CoordinatorResult<OffsetCommitResponseData, Record> commitOffset = build.commitOffset(new OffsetCommitRequestData().setGroupId("foo").setMemberId("member").setGenerationIdOrMemberEpoch(10).setTopics(Collections.singletonList(new OffsetCommitRequestData.OffsetCommitRequestTopic().setName("bar").setPartitions(Arrays.asList(new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("toolarge").setCommitTimestamp(build.time.milliseconds()), new OffsetCommitRequestData.OffsetCommitRequestPartition().setPartitionIndex(1).setCommittedOffset(100L).setCommittedLeaderEpoch(10).setCommittedMetadata("small").setCommitTimestamp(build.time.milliseconds()))))));
        Assertions.assertEquals(new OffsetCommitResponseData().setTopics(Collections.singletonList(new OffsetCommitResponseData.OffsetCommitResponseTopic().setName("bar").setPartitions(Arrays.asList(new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.OFFSET_METADATA_TOO_LARGE.code()), new OffsetCommitResponseData.OffsetCommitResponsePartition().setPartitionIndex(1).setErrorCode(Errors.NONE.code()))))), commitOffset.response());
        Assertions.assertEquals(Collections.singletonList(RecordHelpers.newOffsetCommitRecord("foo", "bar", 1, new OffsetAndMetadata(100L, OptionalInt.of(10), "small", build.time.milliseconds(), OptionalLong.empty()), MetadataImage.EMPTY.features().metadataVersion())), commitOffset.records());
    }

    @Test
    public void testReplay() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        verifyReplay(build, "foo", "bar", 0, new OffsetAndMetadata(100L, OptionalInt.empty(), "small", build.time.milliseconds(), OptionalLong.empty()));
        verifyReplay(build, "foo", "bar", 0, new OffsetAndMetadata(200L, OptionalInt.of(10), "small", build.time.milliseconds(), OptionalLong.empty()));
        verifyReplay(build, "foo", "bar", 1, new OffsetAndMetadata(200L, OptionalInt.of(10), "small", build.time.milliseconds(), OptionalLong.empty()));
        verifyReplay(build, "foo", "bar", 1, new OffsetAndMetadata(300L, OptionalInt.of(10), "small", build.time.milliseconds(), OptionalLong.of(12345L)));
    }

    @Test
    public void testReplayWithTombstone() {
        OffsetMetadataManagerTestContext build = new OffsetMetadataManagerTestContext.Builder().build();
        verifyReplay(build, "foo", "bar", 0, new OffsetAndMetadata(100L, OptionalInt.empty(), "small", build.time.milliseconds(), OptionalLong.empty()));
        build.replay(RecordHelpers.newOffsetCommitTombstoneRecord("foo", "bar", 0));
        Assertions.assertNull(build.offsetMetadataManager.offset("foo", new TopicPartition("bar", 0)));
    }

    private void verifyReplay(OffsetMetadataManagerTestContext offsetMetadataManagerTestContext, String str, String str2, int i, OffsetAndMetadata offsetAndMetadata) {
        offsetMetadataManagerTestContext.replay(RecordHelpers.newOffsetCommitRecord(str, str2, i, offsetAndMetadata, MetadataImage.EMPTY.features().metadataVersion()));
        Assertions.assertEquals(offsetAndMetadata, offsetMetadataManagerTestContext.offsetMetadataManager.offset(str, new TopicPartition(str2, i)));
    }

    private GenericGroupMember mkGenericMember(String str, Optional<String> optional) {
        return new GenericGroupMember(str, optional, "client-id", "host", 5000, 5000, "consumer", new JoinGroupRequestData.JoinGroupRequestProtocolCollection(Collections.singletonList(new JoinGroupRequestData.JoinGroupRequestProtocol().setName("range").setMetadata(new byte[0])).iterator()));
    }
}
