package org.apache.kafka.coordinator.group;

import java.net.InetAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupMetadataManagerTestContext;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/coordinator/group/GroupCoordinatorServiceTest.class */
public class GroupCoordinatorServiceTest {
    private CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime() {
        return (CoordinatorRuntime) Mockito.mock(CoordinatorRuntime.class);
    }

    private GroupCoordinatorConfig createConfig() {
        return new GroupCoordinatorConfig(1, 10, 45, 5, Integer.MAX_VALUE, Collections.singletonList(new RangeAssignor()), 1000, 4096, Integer.MAX_VALUE, 3000, 300000, 120, 50000, 600000L, 1440000L, 5000, ConsumerGroupMigrationPolicy.DISABLED, CompressionType.NONE);
    }

    @Test
    public void testStartupShutdown() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        groupCoordinatorService.shutdown();
        ((CoordinatorRuntime) Mockito.verify(mockRuntime, Mockito.times(1))).close();
    }

    @Test
    public void testConsumerGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), new ConsumerGroupHeartbeatRequestData().setGroupId("foo")).get());
    }

    @Test
    public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        ConsumerGroupHeartbeatRequestData groupId = new ConsumerGroupHeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("consumer-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData()));
        org.junit.jupiter.api.Assertions.assertEquals(new ConsumerGroupHeartbeatResponseData(), groupCoordinatorService.consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), groupId).get(5L, TimeUnit.SECONDS));
    }

    private static Stream<Arguments> testConsumerGroupHeartbeatWithExceptionSource() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{new UnknownTopicOrPartitionException(), Short.valueOf(Errors.COORDINATOR_NOT_AVAILABLE.code()), null}), Arguments.arguments(new Object[]{new NotEnoughReplicasException(), Short.valueOf(Errors.COORDINATOR_NOT_AVAILABLE.code()), null}), Arguments.arguments(new Object[]{new org.apache.kafka.common.errors.TimeoutException(), Short.valueOf(Errors.COORDINATOR_NOT_AVAILABLE.code()), null}), Arguments.arguments(new Object[]{new NotLeaderOrFollowerException(), Short.valueOf(Errors.NOT_COORDINATOR.code()), null}), Arguments.arguments(new Object[]{new KafkaStorageException(), Short.valueOf(Errors.NOT_COORDINATOR.code()), null}), Arguments.arguments(new Object[]{new RecordTooLargeException(), Short.valueOf(Errors.UNKNOWN_SERVER_ERROR.code()), null}), Arguments.arguments(new Object[]{new RecordBatchTooLargeException(), Short.valueOf(Errors.UNKNOWN_SERVER_ERROR.code()), null}), Arguments.arguments(new Object[]{new InvalidFetchSizeException(""), Short.valueOf(Errors.UNKNOWN_SERVER_ERROR.code()), null}), Arguments.arguments(new Object[]{new InvalidRequestException("Invalid"), Short.valueOf(Errors.INVALID_REQUEST.code()), "Invalid"})});
    }

    @MethodSource({"testConsumerGroupHeartbeatWithExceptionSource"})
    @ParameterizedTest
    public void testConsumerGroupHeartbeatWithException(Throwable th, short s, String str) throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        ConsumerGroupHeartbeatRequestData groupId = new ConsumerGroupHeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("consumer-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(th));
        org.junit.jupiter.api.Assertions.assertEquals(new ConsumerGroupHeartbeatResponseData().setErrorCode(s).setErrorMessage(str), groupCoordinatorService.consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), groupId).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testPartitionFor() {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        org.junit.jupiter.api.Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            groupCoordinatorService.partitionFor("foo");
        });
        groupCoordinatorService.startup(() -> {
            return 10;
        });
        org.junit.jupiter.api.Assertions.assertEquals(Utils.abs("foo".hashCode()) % 10, groupCoordinatorService.partitionFor("foo"));
    }

    @Test
    public void testGroupMetadataTopicConfigs() {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        Properties properties = new Properties();
        properties.put("cleanup.policy", "compact");
        properties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        properties.put("segment.bytes", "1000");
        org.junit.jupiter.api.Assertions.assertEquals(properties, groupCoordinatorService.groupMetadataTopicConfigs());
    }

    @Test
    public void testOnElection() {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        org.junit.jupiter.api.Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            groupCoordinatorService.onElection(5, 10);
        });
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        groupCoordinatorService.onElection(5, 10);
        ((CoordinatorRuntime) Mockito.verify(mockRuntime, Mockito.times(1))).scheduleLoadOperation(new TopicPartition("__consumer_offsets", 5), 10);
    }

    @Test
    public void testOnResignation() {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        org.junit.jupiter.api.Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            groupCoordinatorService.onResignation(5, OptionalInt.of(10));
        });
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        groupCoordinatorService.onResignation(5, OptionalInt.of(10));
        ((CoordinatorRuntime) Mockito.verify(mockRuntime, Mockito.times(1))).scheduleUnloadOperation(new TopicPartition("__consumer_offsets", 5), OptionalInt.of(10));
    }

    @Test
    public void testOnResignationWithEmptyLeaderEpoch() {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        groupCoordinatorService.onResignation(5, OptionalInt.empty());
        ((CoordinatorRuntime) Mockito.verify(mockRuntime, Mockito.times(1))).scheduleUnloadOperation(new TopicPartition("__consumer_offsets", 5), OptionalInt.empty());
    }

    @Test
    public void testJoinGroup() {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        JoinGroupRequestData sessionTimeoutMs = new JoinGroupRequestData().setGroupId("foo").setSessionTimeoutMs(1000);
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-join"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new JoinGroupResponseData()));
        org.junit.jupiter.api.Assertions.assertFalse(groupCoordinatorService.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), sessionTimeoutMs, BufferSupplier.NO_CACHING).isDone());
    }

    @Test
    public void testJoinGroupWithException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        JoinGroupRequestData sessionTimeoutMs = new JoinGroupRequestData().setGroupId("foo").setSessionTimeoutMs(1000);
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-join"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));
        org.junit.jupiter.api.Assertions.assertEquals(new JoinGroupResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()), groupCoordinatorService.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), sessionTimeoutMs, BufferSupplier.NO_CACHING).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testJoinGroupInvalidGroupId() throws Exception {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        CompletableFuture joinGroup = groupCoordinatorService.joinGroup(new RequestContext(new RequestHeader(ApiKeys.JOIN_GROUP, ApiKeys.JOIN_GROUP.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false), new JoinGroupRequestData().setGroupId((String) null).setMemberId(""), BufferSupplier.NO_CACHING);
        org.junit.jupiter.api.Assertions.assertTrue(joinGroup.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new JoinGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()).setMemberId(""), joinGroup.get());
    }

    @Test
    public void testJoinGroupWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new JoinGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), new JoinGroupRequestData().setGroupId("foo"), BufferSupplier.NO_CACHING).get());
    }

    @ValueSource(ints = {119, 50001})
    @ParameterizedTest
    public void testJoinGroupInvalidSessionTimeout(int i) throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        org.junit.jupiter.api.Assertions.assertEquals(new JoinGroupResponseData().setErrorCode(Errors.INVALID_SESSION_TIMEOUT.code()), groupCoordinatorService.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), new GroupMetadataManagerTestContext.JoinGroupRequestBuilder().withGroupId("group-id").withMemberId("").withSessionTimeoutMs(i).build(), BufferSupplier.NO_CACHING).get());
    }

    @Test
    public void testSyncGroup() {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        SyncGroupRequestData groupId = new SyncGroupRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-sync"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new SyncGroupResponseData()));
        org.junit.jupiter.api.Assertions.assertFalse(groupCoordinatorService.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), groupId, BufferSupplier.NO_CACHING).isDone());
    }

    @Test
    public void testSyncGroupWithException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        SyncGroupRequestData groupId = new SyncGroupRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-sync"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new IllegalStateException()));
        CompletableFuture syncGroup = groupCoordinatorService.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), groupId, BufferSupplier.NO_CACHING);
        org.junit.jupiter.api.Assertions.assertTrue(syncGroup.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()), syncGroup.get());
    }

    @Test
    public void testSyncGroupInvalidGroupId() throws Exception {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        CompletableFuture syncGroup = groupCoordinatorService.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), new SyncGroupRequestData().setGroupId((String) null).setMemberId(""), BufferSupplier.NO_CACHING);
        org.junit.jupiter.api.Assertions.assertTrue(syncGroup.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new SyncGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()), syncGroup.get());
    }

    @Test
    public void testSyncGroupWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new SyncGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), new SyncGroupRequestData().setGroupId("foo"), BufferSupplier.NO_CACHING).get());
    }

    @Test
    public void testHeartbeat() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        HeartbeatRequestData groupId = new HeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new HeartbeatResponseData()));
        CompletableFuture heartbeat = groupCoordinatorService.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), groupId);
        org.junit.jupiter.api.Assertions.assertTrue(heartbeat.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new HeartbeatResponseData(), heartbeat.get());
    }

    @Test
    public void testHeartbeatCoordinatorNotAvailableException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        HeartbeatRequestData groupId = new HeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new CoordinatorLoadInProgressException((String) null)));
        CompletableFuture heartbeat = groupCoordinatorService.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), groupId);
        org.junit.jupiter.api.Assertions.assertTrue(heartbeat.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new HeartbeatResponseData(), heartbeat.get());
    }

    @Test
    public void testHeartbeatCoordinatorException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        HeartbeatRequestData groupId = new HeartbeatRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-heartbeat"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new RebalanceInProgressException()));
        CompletableFuture heartbeat = groupCoordinatorService.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), groupId);
        org.junit.jupiter.api.Assertions.assertTrue(heartbeat.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()), heartbeat.get());
    }

    @Test
    public void testHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new HeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).heartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), new HeartbeatRequestData().setGroupId("foo")).get());
    }

    @Test
    public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 3;
        });
        List asList = Arrays.asList(new ListGroupsResponseData.ListedGroup().setGroupId("group0").setProtocolType("protocol1").setGroupState("Stable").setGroupType("classic"), new ListGroupsResponseData.ListedGroup().setGroupId("group1").setProtocolType("consumer").setGroupState("Empty").setGroupType("consumer"), new ListGroupsResponseData.ListedGroup().setGroupId("group2").setProtocolType("consumer").setGroupState("Dead").setGroupType("consumer"));
        Mockito.when(mockRuntime.scheduleReadAllOperation((String) ArgumentMatchers.eq("list-groups"), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(Arrays.asList(CompletableFuture.completedFuture(Collections.singletonList(asList.get(0))), CompletableFuture.completedFuture(Collections.singletonList(asList.get(1))), CompletableFuture.completedFuture(Collections.singletonList(asList.get(2)))));
        org.junit.jupiter.api.Assertions.assertEquals(asList, ((ListGroupsResponseData) groupCoordinatorService.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), new ListGroupsRequestData()).get(5L, TimeUnit.SECONDS)).groups());
    }

    @Test
    public void testListGroupsFailedWithNotCoordinatorException() throws InterruptedException, ExecutionException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 3;
        });
        List asList = Arrays.asList(new ListGroupsResponseData.ListedGroup().setGroupId("group0").setProtocolType("protocol1").setGroupState("Stable").setGroupType("classic"), new ListGroupsResponseData.ListedGroup().setGroupId("group1").setProtocolType("consumer").setGroupState("Empty").setGroupType("consumer"));
        Mockito.when(mockRuntime.scheduleReadAllOperation((String) ArgumentMatchers.eq("list-groups"), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(Arrays.asList(CompletableFuture.completedFuture(Collections.singletonList(asList.get(0))), CompletableFuture.completedFuture(Collections.singletonList(asList.get(1))), FutureUtils.failedFuture(new NotCoordinatorException(""))));
        org.junit.jupiter.api.Assertions.assertEquals(asList, ((ListGroupsResponseData) groupCoordinatorService.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), new ListGroupsRequestData()).get(5L, TimeUnit.SECONDS)).groups());
    }

    @Test
    public void testListGroupsWithFailure() throws InterruptedException, ExecutionException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 3;
        });
        Mockito.when(mockRuntime.scheduleReadAllOperation((String) ArgumentMatchers.eq("list-groups"), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(Arrays.asList(CompletableFuture.completedFuture(Collections.emptyList()), CompletableFuture.completedFuture(Collections.emptyList()), FutureUtils.failedFuture(new CoordinatorLoadInProgressException(""))));
        org.junit.jupiter.api.Assertions.assertEquals(new ListGroupsResponseData().setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code()), groupCoordinatorService.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), new ListGroupsRequestData()).get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testListGroupsWithEmptyTopicPartitions() throws ExecutionException, InterruptedException {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        int i = 0;
        groupCoordinatorService.startup(() -> {
            return i;
        });
        org.junit.jupiter.api.Assertions.assertEquals(new ListGroupsResponseData(), groupCoordinatorService.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), new ListGroupsRequestData()).get());
    }

    @Test
    public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new ListGroupsResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), new ListGroupsRequestData()).get());
    }

    @Test
    public void testDescribeGroups() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        int i = 2;
        groupCoordinatorService.startup(() -> {
            return i;
        });
        DescribeGroupsResponseData.DescribedGroup groupId = new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id-1");
        DescribeGroupsResponseData.DescribedGroup groupId2 = new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id-2");
        List asList = Arrays.asList(groupId, groupId2);
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("describe-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(groupId)));
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("describe-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(completableFuture);
        CompletableFuture describeGroups = groupCoordinatorService.describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Arrays.asList("group-id-1", "group-id-2"));
        org.junit.jupiter.api.Assertions.assertFalse(describeGroups.isDone());
        completableFuture.complete(Collections.singletonList(groupId2));
        org.junit.jupiter.api.Assertions.assertEquals(asList, describeGroups.get());
    }

    @Test
    public void testDescribeGroupsInvalidGroupId() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        int i = 1;
        groupCoordinatorService.startup(() -> {
            return i;
        });
        DescribeGroupsResponseData.DescribedGroup groupId = new DescribeGroupsResponseData.DescribedGroup().setGroupId("");
        List asList = Arrays.asList(new DescribeGroupsResponseData.DescribedGroup().setGroupId((String) null).setErrorCode(Errors.INVALID_GROUP_ID.code()), groupId);
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("describe-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(groupId)));
        org.junit.jupiter.api.Assertions.assertEquals(asList, groupCoordinatorService.describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Arrays.asList("", null)).get());
    }

    @Test
    public void testDescribeGroupCoordinatorLoadInProgress() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        int i = 1;
        groupCoordinatorService.startup(() -> {
            return i;
        });
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("describe-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new CoordinatorLoadInProgressException((String) null)));
        org.junit.jupiter.api.Assertions.assertEquals(Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())), groupCoordinatorService.describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Collections.singletonList("group-id")).get());
    }

    @Test
    public void testDescribeGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Collections.singletonList("group-id")).get());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchOffsets(boolean z) throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        OffsetFetchRequestData.OffsetFetchRequestGroup topics = new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0))));
        OffsetFetchResponseData.OffsetFetchResponseGroup topics2 = new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions().setPartitionIndex(0).setCommittedOffset(100L)))));
        if (z) {
            Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("fetch-offsets"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(topics2));
        } else {
            Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("fetch-offsets"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(topics2));
        }
        org.junit.jupiter.api.Assertions.assertEquals(topics2, groupCoordinatorService.fetchOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), topics, z).get(5L, TimeUnit.SECONDS));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchOffsetsWhenNotStarted(boolean z) throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).fetchOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0)))), z).get());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchAllOffsets(boolean z) throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        OffsetFetchRequestData.OffsetFetchRequestGroup groupId = new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group");
        OffsetFetchResponseData.OffsetFetchResponseGroup topics = new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions().setPartitionIndex(0).setCommittedOffset(100L)))));
        if (z) {
            Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("fetch-all-offsets"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(topics));
        } else {
            Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("fetch-all-offsets"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(topics));
        }
        org.junit.jupiter.api.Assertions.assertEquals(topics, groupCoordinatorService.fetchAllOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), groupId, z).get(5L, TimeUnit.SECONDS));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testFetchAllOffsetsWhenNotStarted(boolean z) throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).fetchAllOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group"), z).get());
    }

    @Test
    public void testLeaveGroup() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        LeaveGroupRequestData groupId = new LeaveGroupRequestData().setGroupId("foo");
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-leave"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new LeaveGroupResponseData()));
        CompletableFuture leaveGroup = groupCoordinatorService.leaveGroup(TestUtil.requestContext(ApiKeys.LEAVE_GROUP), groupId);
        org.junit.jupiter.api.Assertions.assertTrue(leaveGroup.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new LeaveGroupResponseData(), leaveGroup.get());
    }

    @Test
    public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        LeaveGroupRequestData members = new LeaveGroupRequestData().setGroupId("foo").setMembers(Arrays.asList(new LeaveGroupRequestData.MemberIdentity().setMemberId("member-1").setGroupInstanceId("instance-1"), new LeaveGroupRequestData.MemberIdentity().setMemberId("member-2").setGroupInstanceId("instance-2")));
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("classic-group-leave"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new UnknownMemberIdException()));
        CompletableFuture leaveGroup = groupCoordinatorService.leaveGroup(TestUtil.requestContext(ApiKeys.LEAVE_GROUP), members);
        org.junit.jupiter.api.Assertions.assertTrue(leaveGroup.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(Arrays.asList(new LeaveGroupResponseData.MemberResponse().setMemberId("member-1").setGroupInstanceId("instance-1").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), new LeaveGroupResponseData.MemberResponse().setMemberId("member-2").setGroupInstanceId("instance-2").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()))), leaveGroup.get());
    }

    @Test
    public void testLeaveGroupWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new LeaveGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).leaveGroup(TestUtil.requestContext(ApiKeys.LEAVE_GROUP), new LeaveGroupRequestData().setGroupId("foo")).get());
    }

    @Test
    public void testConsumerGroupDescribe() throws InterruptedException, ExecutionException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        int i = 2;
        groupCoordinatorService.startup(() -> {
            return i;
        });
        ConsumerGroupDescribeResponseData.DescribedGroup groupId = new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id-1");
        ConsumerGroupDescribeResponseData.DescribedGroup groupId2 = new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id-2");
        List asList = Arrays.asList(groupId, groupId2);
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("consumer-group-describe"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(groupId)));
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("consumer-group-describe"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(completableFuture);
        CompletableFuture consumerGroupDescribe = groupCoordinatorService.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Arrays.asList("group-id-1", "group-id-2"));
        org.junit.jupiter.api.Assertions.assertFalse(consumerGroupDescribe.isDone());
        completableFuture.complete(Collections.singletonList(groupId2));
        org.junit.jupiter.api.Assertions.assertEquals(asList, consumerGroupDescribe.get());
    }

    @Test
    public void testConsumerGroupDescribeInvalidGroupId() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        int i = 1;
        groupCoordinatorService.startup(() -> {
            return i;
        });
        ConsumerGroupDescribeResponseData.DescribedGroup errorCode = new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId((String) null).setErrorCode(Errors.INVALID_GROUP_ID.code());
        List asList = Arrays.asList(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId((String) null).setErrorCode(Errors.INVALID_GROUP_ID.code()), errorCode);
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("consumer-group-describe"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(errorCode)));
        org.junit.jupiter.api.Assertions.assertEquals(asList, groupCoordinatorService.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Arrays.asList("", null)).get());
    }

    @Test
    public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        int i = 1;
        groupCoordinatorService.startup(() -> {
            return i;
        });
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("consumer-group-describe"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new CoordinatorLoadInProgressException((String) null)));
        org.junit.jupiter.api.Assertions.assertEquals(Collections.singletonList(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())), groupCoordinatorService.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Collections.singletonList("group-id")).get());
    }

    @Test
    public void testConsumerGroupDescribeCoordinatorNotActive() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        Mockito.when(mockRuntime.scheduleReadOperation((String) ArgumentMatchers.eq("consumer-group-describe"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_NOT_AVAILABLE.exception()));
        org.junit.jupiter.api.Assertions.assertEquals(Collections.singletonList(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())), groupCoordinatorService.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Collections.singletonList("group-id")).get());
    }

    @Test
    public void testDeleteOffsets() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        OffsetDeleteRequestData topics = new OffsetDeleteRequestData().setGroupId("group").setTopics(new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName("topic").setPartitions(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)))).iterator()));
        OffsetDeleteResponseData topics2 = new OffsetDeleteResponseData().setTopics(new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)).iterator()))).iterator()));
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("delete-offsets"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(topics2));
        CompletableFuture deleteOffsets = groupCoordinatorService.deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), topics, BufferSupplier.NO_CACHING);
        org.junit.jupiter.api.Assertions.assertTrue(deleteOffsets.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(topics2, deleteOffsets.get());
    }

    @Test
    public void testDeleteOffsetsInvalidGroupId() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        OffsetDeleteRequestData topics = new OffsetDeleteRequestData().setGroupId("").setTopics(new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName("topic").setPartitions(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)))).iterator()));
        OffsetDeleteResponseData errorCode = new OffsetDeleteResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code());
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("delete-offsets"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(errorCode));
        CompletableFuture deleteOffsets = groupCoordinatorService.deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), topics, BufferSupplier.NO_CACHING);
        org.junit.jupiter.api.Assertions.assertTrue(deleteOffsets.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(errorCode, deleteOffsets.get());
    }

    @MethodSource({"testConsumerGroupHeartbeatWithExceptionSource"})
    @ParameterizedTest
    public void testDeleteOffsetsWithException(Throwable th, short s) throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        OffsetDeleteRequestData topics = new OffsetDeleteRequestData().setGroupId("group").setTopics(new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName("topic").setPartitions(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)))).iterator()));
        OffsetDeleteResponseData errorCode = new OffsetDeleteResponseData().setErrorCode(s);
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("delete-offsets"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(th));
        CompletableFuture deleteOffsets = groupCoordinatorService.deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), topics, BufferSupplier.NO_CACHING);
        org.junit.jupiter.api.Assertions.assertTrue(deleteOffsets.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(errorCode, deleteOffsets.get());
    }

    @Test
    public void testDeleteOffsetsWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new OffsetDeleteResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), new OffsetDeleteRequestData().setGroupId("foo"), BufferSupplier.NO_CACHING).get());
    }

    @Test
    public void testDeleteGroups() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        groupCoordinatorService.startup(() -> {
            return 3;
        });
        DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
        DeleteGroupsResponseData.DeletableGroupResult groupId = new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");
        deletableGroupResultCollection.add(groupId);
        DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResultCollection2 = new DeleteGroupsResponseData.DeletableGroupResultCollection();
        DeleteGroupsResponseData.DeletableGroupResult groupId2 = new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2");
        deletableGroupResultCollection2.add(groupId2);
        DeleteGroupsResponseData.DeletableGroupResult errorCode = new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-3").setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
        DeleteGroupsResponseData.DeletableGroupResultCollection deletableGroupResultCollection3 = new DeleteGroupsResponseData.DeletableGroupResultCollection();
        deletableGroupResultCollection3.addAll(Arrays.asList(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId((String) null).setErrorCode(Errors.INVALID_GROUP_ID.code()), groupId2.duplicate(), errorCode.duplicate(), groupId.duplicate()));
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("delete-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 2)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(deletableGroupResultCollection));
        CompletableFuture completableFuture = new CompletableFuture();
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("delete-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(completableFuture);
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("delete-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 1)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
        CompletableFuture deleteGroups = groupCoordinatorService.deleteGroups(TestUtil.requestContext(ApiKeys.DELETE_GROUPS), Arrays.asList("group-id-1", "group-id-2", "group-id-3", null), BufferSupplier.NO_CACHING);
        org.junit.jupiter.api.Assertions.assertFalse(deleteGroups.isDone());
        completableFuture.complete(deletableGroupResultCollection2);
        org.junit.jupiter.api.Assertions.assertTrue(deleteGroups.isDone());
        org.junit.jupiter.api.Assertions.assertEquals(deletableGroupResultCollection3, deleteGroups.get());
    }

    @MethodSource({"testConsumerGroupHeartbeatWithExceptionSource"})
    @ParameterizedTest
    public void testDeleteGroupsWithException(Throwable th, short s) throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class));
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleWriteOperation((String) ArgumentMatchers.eq("delete-groups"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(th));
        org.junit.jupiter.api.Assertions.assertEquals(new DeleteGroupsResponseData.DeletableGroupResultCollection(Collections.singletonList(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id").setErrorCode(s)).iterator()), groupCoordinatorService.deleteGroups(TestUtil.requestContext(ApiKeys.DELETE_GROUPS), Collections.singletonList("group-id"), BufferSupplier.NO_CACHING).get());
    }

    @Test
    public void testDeleteGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new DeleteGroupsResponseData.DeletableGroupResultCollection(Collections.singletonList(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("foo").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())).iterator()), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), (GroupCoordinatorMetrics) Mockito.mock(GroupCoordinatorMetrics.class)).deleteGroups(TestUtil.requestContext(ApiKeys.DELETE_GROUPS), Collections.singletonList("foo"), BufferSupplier.NO_CACHING).get());
    }

    @Test
    public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionException, InterruptedException {
        org.junit.jupiter.api.Assertions.assertEquals(new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))))), new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), new TxnOffsetCommitRequestData().setGroupId("foo").setTransactionalId("transactional-id").setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))), BufferSupplier.NO_CACHING).get());
    }

    @ValueSource(strings = {""})
    @ParameterizedTest
    @NullSource
    public void testCommitTransactionalOffsetsWithInvalidGroupId(String str) throws ExecutionException, InterruptedException {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        org.junit.jupiter.api.Assertions.assertEquals(new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.INVALID_GROUP_ID.code()))))), groupCoordinatorService.commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), new TxnOffsetCommitRequestData().setGroupId(str).setTransactionalId("transactional-id").setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L))))), BufferSupplier.NO_CACHING).get());
    }

    @Test
    public void testCommitTransactionalOffsets() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        TxnOffsetCommitRequestData topics = new TxnOffsetCommitRequestData().setGroupId("foo").setTransactionalId("transactional-id").setProducerId(10L).setProducerEpoch((short) 5).setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        TxnOffsetCommitResponseData topics2 = new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code())))));
        Mockito.when(mockRuntime.scheduleTransactionalWriteOperation((String) ArgumentMatchers.eq("txn-commit-offset"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (String) ArgumentMatchers.eq("transactional-id"), ArgumentMatchers.eq(10L), ArgumentMatchers.eq((short) 5), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any(), (Short) ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(topics2));
        org.junit.jupiter.api.Assertions.assertEquals(topics2, groupCoordinatorService.commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), topics, BufferSupplier.NO_CACHING).get());
    }

    @ParameterizedTest
    @CsvSource({"NOT_ENOUGH_REPLICAS, COORDINATOR_NOT_AVAILABLE", "NETWORK_EXCEPTION, COORDINATOR_LOAD_IN_PROGRESS"})
    public void testCommitTransactionalOffsetsWithWrappedError(Errors errors, Errors errors2) throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        TxnOffsetCommitRequestData topics = new TxnOffsetCommitRequestData().setGroupId("foo").setTransactionalId("transactional-id").setProducerId(10L).setProducerEpoch((short) 5).setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        TxnOffsetCommitResponseData topics2 = new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(errors2.code())))));
        Mockito.when(mockRuntime.scheduleTransactionalWriteOperation((String) ArgumentMatchers.eq("txn-commit-offset"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), (String) ArgumentMatchers.eq("transactional-id"), ArgumentMatchers.eq(10L), ArgumentMatchers.eq((short) 5), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any(), (Short) ArgumentMatchers.any())).thenReturn(FutureUtils.failedFuture(new CompletionException((Throwable) errors.exception())));
        org.junit.jupiter.api.Assertions.assertEquals(topics2, groupCoordinatorService.commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), topics, BufferSupplier.NO_CACHING).get());
    }

    @Test
    public void testCompleteTransaction() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        Mockito.when(mockRuntime.scheduleTransactionCompletion((String) ArgumentMatchers.eq("write-txn-marker"), (TopicPartition) ArgumentMatchers.eq(new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.eq(100L), ArgumentMatchers.eq((short) 5), ArgumentMatchers.eq(10), (TransactionResult) ArgumentMatchers.eq(TransactionResult.COMMIT), (Duration) ArgumentMatchers.eq(Duration.ofMillis(100L)))).thenReturn(CompletableFuture.completedFuture(null));
        org.junit.jupiter.api.Assertions.assertNull(groupCoordinatorService.completeTransaction(new TopicPartition("__consumer_offsets", 0), 100L, (short) 5, 10, TransactionResult.COMMIT, Duration.ofMillis(100L)).get());
    }

    @Test
    public void testCompleteTransactionWhenNotCoordinatorServiceStarted() {
        TestUtils.assertFutureThrows(new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics()).completeTransaction(new TopicPartition("foo", 0), 100L, (short) 5, 10, TransactionResult.COMMIT, Duration.ofMillis(100L)), CoordinatorNotAvailableException.class);
    }

    @Test
    public void testCompleteTransactionWithUnexpectedPartition() {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 1;
        });
        TestUtils.assertFutureThrows(groupCoordinatorService.completeTransaction(new TopicPartition("foo", 0), 100L, (short) 5, 10, TransactionResult.COMMIT, Duration.ofMillis(100L)), IllegalStateException.class);
    }

    @Test
    public void testOnPartitionsDeleted() {
        CoordinatorRuntime<GroupCoordinatorShard, CoordinatorRecord> mockRuntime = mockRuntime();
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime, new GroupCoordinatorMetrics());
        groupCoordinatorService.startup(() -> {
            return 3;
        });
        Mockito.when(mockRuntime.scheduleWriteAllOperation((String) ArgumentMatchers.eq("on-partition-deleted"), (Duration) ArgumentMatchers.eq(Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation) ArgumentMatchers.any())).thenReturn(Arrays.asList(CompletableFuture.completedFuture(null), CompletableFuture.completedFuture(null), FutureUtils.failedFuture(Errors.COORDINATOR_LOAD_IN_PROGRESS.exception())));
        org.junit.jupiter.api.Assertions.assertDoesNotThrow(() -> {
            groupCoordinatorService.onPartitionsDeleted(Collections.singletonList(new TopicPartition("foo", 0)), BufferSupplier.NO_CACHING);
        });
    }

    @Test
    public void testOnPartitionsDeletedWhenServiceIsNotStarted() {
        GroupCoordinatorService groupCoordinatorService = new GroupCoordinatorService(new LogContext(), createConfig(), mockRuntime(), new GroupCoordinatorMetrics());
        org.junit.jupiter.api.Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> {
            groupCoordinatorService.onPartitionsDeleted(Collections.singletonList(new TopicPartition("foo", 0)), BufferSupplier.NO_CACHING);
        });
    }
}
