/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.coordinator.group;

import java.net.InetAddress;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.OptionalInt;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.CoordinatorLoadInProgressException;
import org.apache.kafka.common.errors.CoordinatorNotAvailableException;
import org.apache.kafka.common.errors.InvalidFetchSizeException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.KafkaStorageException;
import org.apache.kafka.common.errors.NotCoordinatorException;
import org.apache.kafka.common.errors.NotEnoughReplicasException;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.UnknownMemberIdException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.message.ConsumerGroupDescribeResponseData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatRequestData;
import org.apache.kafka.common.message.ConsumerGroupHeartbeatResponseData;
import org.apache.kafka.common.message.DeleteGroupsResponseData;
import org.apache.kafka.common.message.DescribeGroupsResponseData;
import org.apache.kafka.common.message.HeartbeatRequestData;
import org.apache.kafka.common.message.HeartbeatResponseData;
import org.apache.kafka.common.message.JoinGroupRequestData;
import org.apache.kafka.common.message.JoinGroupResponseData;
import org.apache.kafka.common.message.LeaveGroupRequestData;
import org.apache.kafka.common.message.LeaveGroupResponseData;
import org.apache.kafka.common.message.ListGroupsRequestData;
import org.apache.kafka.common.message.ListGroupsResponseData;
import org.apache.kafka.common.message.OffsetDeleteRequestData;
import org.apache.kafka.common.message.OffsetDeleteResponseData;
import org.apache.kafka.common.message.OffsetFetchRequestData;
import org.apache.kafka.common.message.OffsetFetchResponseData;
import org.apache.kafka.common.message.SyncGroupRequestData;
import org.apache.kafka.common.message.SyncGroupResponseData;
import org.apache.kafka.common.message.TxnOffsetCommitRequestData;
import org.apache.kafka.common.message.TxnOffsetCommitResponseData;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.BufferSupplier;
import org.apache.kafka.common.utils.ImplicitLinkedHashCollection;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig;
import org.apache.kafka.coordinator.group.GroupCoordinatorService;
import org.apache.kafka.coordinator.group.GroupCoordinatorShard;
import org.apache.kafka.coordinator.group.Record;
import org.apache.kafka.coordinator.group.TestUtil;
import org.apache.kafka.coordinator.group.assignor.RangeAssignor;
import org.apache.kafka.coordinator.group.metrics.GroupCoordinatorMetrics;
import org.apache.kafka.coordinator.group.runtime.CoordinatorRuntime;
import org.apache.kafka.server.record.BrokerCompressionType;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.mockito.verification.VerificationMode;

public class GroupCoordinatorServiceTest {
    private CoordinatorRuntime<GroupCoordinatorShard, Record> mockRuntime() {
        return (CoordinatorRuntime)Mockito.mock(CoordinatorRuntime.class);
    }

    private GroupCoordinatorConfig createConfig() {
        return new GroupCoordinatorConfig(1, 45, 5, Integer.MAX_VALUE, Collections.singletonList(new RangeAssignor()), 1000, 4096, Integer.MAX_VALUE, 3000, 300000, 120, 50000, 86400000L, 0.5, 86400000L, 600000L, 1440000L, 5000);
    }

    @Test
    public void testStartupShutdown() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        service.shutdown();
        ((CoordinatorRuntime)Mockito.verify(runtime, (VerificationMode)Mockito.times((int)1))).close();
    }

    @Test
    public void testConsumerGroupHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData().setGroupId("foo");
        CompletableFuture future = service.consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), request);
        Assertions.assertEquals((Object)new ConsumerGroupHeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testConsumerGroupHeartbeat() throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"consumer-group-heartbeat"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new ConsumerGroupHeartbeatResponseData()));
        CompletableFuture future = service.consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), request);
        Assertions.assertEquals((Object)new ConsumerGroupHeartbeatResponseData(), future.get(5L, TimeUnit.SECONDS));
    }

    private static Stream<Arguments> testConsumerGroupHeartbeatWithExceptionSource() {
        return Stream.of(Arguments.arguments((Object[])new Object[]{new UnknownTopicOrPartitionException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null}), Arguments.arguments((Object[])new Object[]{new NotEnoughReplicasException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null}), Arguments.arguments((Object[])new Object[]{new org.apache.kafka.common.errors.TimeoutException(), Errors.COORDINATOR_NOT_AVAILABLE.code(), null}), Arguments.arguments((Object[])new Object[]{new NotLeaderOrFollowerException(), Errors.NOT_COORDINATOR.code(), null}), Arguments.arguments((Object[])new Object[]{new KafkaStorageException(), Errors.NOT_COORDINATOR.code(), null}), Arguments.arguments((Object[])new Object[]{new RecordTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null}), Arguments.arguments((Object[])new Object[]{new RecordBatchTooLargeException(), Errors.UNKNOWN_SERVER_ERROR.code(), null}), Arguments.arguments((Object[])new Object[]{new InvalidFetchSizeException(""), Errors.UNKNOWN_SERVER_ERROR.code(), null}), Arguments.arguments((Object[])new Object[]{new InvalidRequestException("Invalid"), Errors.INVALID_REQUEST.code(), "Invalid"}));
    }

    @ParameterizedTest
    @MethodSource(value={"testConsumerGroupHeartbeatWithExceptionSource"})
    public void testConsumerGroupHeartbeatWithException(Throwable exception, short expectedErrorCode, String expectedErrorMessage) throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        ConsumerGroupHeartbeatRequestData request = new ConsumerGroupHeartbeatRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"consumer-group-heartbeat"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)exception));
        CompletableFuture future = service.consumerGroupHeartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), request);
        Assertions.assertEquals((Object)new ConsumerGroupHeartbeatResponseData().setErrorCode(expectedErrorCode).setErrorMessage(expectedErrorMessage), future.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testPartitionFor() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> service.partitionFor("foo"));
        service.startup(() -> 10);
        Assertions.assertEquals((int)(Utils.abs((int)"foo".hashCode()) % 10), (int)service.partitionFor("foo"));
    }

    @Test
    public void testGroupMetadataTopicConfigs() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        Properties expectedProperties = new Properties();
        expectedProperties.put("cleanup.policy", "compact");
        expectedProperties.put("compression.type", BrokerCompressionType.PRODUCER.name);
        expectedProperties.put("segment.bytes", "1000");
        expectedProperties.put("delete.retention.ms", "86400000");
        expectedProperties.put("min.cleanable.dirty.ratio", "0.5");
        expectedProperties.put("max.compaction.lag.ms", "86400000");
        Assertions.assertEquals((Object)expectedProperties, (Object)service.groupMetadataTopicConfigs());
    }

    @Test
    public void testOnElection() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> service.onElection(5, 10));
        service.startup(() -> 1);
        service.onElection(5, 10);
        ((CoordinatorRuntime)Mockito.verify(runtime, (VerificationMode)Mockito.times((int)1))).scheduleLoadOperation(new TopicPartition("__consumer_offsets", 5), 10);
    }

    @Test
    public void testOnResignation() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> service.onResignation(5, OptionalInt.of(10)));
        service.startup(() -> 1);
        service.onResignation(5, OptionalInt.of(10));
        ((CoordinatorRuntime)Mockito.verify(runtime, (VerificationMode)Mockito.times((int)1))).scheduleUnloadOperation(new TopicPartition("__consumer_offsets", 5), OptionalInt.of(10));
    }

    @Test
    public void testOnResignationWithEmptyLeaderEpoch() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        service.onResignation(5, OptionalInt.empty());
        ((CoordinatorRuntime)Mockito.verify(runtime, (VerificationMode)Mockito.times((int)1))).scheduleUnloadOperation(new TopicPartition("__consumer_offsets", 5), OptionalInt.empty());
    }

    @Test
    public void testJoinGroup() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        JoinGroupRequestData request = new JoinGroupRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"classic-group-join"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new JoinGroupResponseData()));
        CompletableFuture responseFuture = service.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), request, BufferSupplier.NO_CACHING);
        Assertions.assertFalse((boolean)responseFuture.isDone());
    }

    @Test
    public void testJoinGroupWithException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        JoinGroupRequestData request = new JoinGroupRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"classic-group-join"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new IllegalStateException()));
        CompletableFuture future = service.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new JoinGroupResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()), future.get(5L, TimeUnit.SECONDS));
    }

    @Test
    public void testJoinGroupInvalidGroupId() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        JoinGroupRequestData request = new JoinGroupRequestData().setGroupId(null).setMemberId("");
        RequestContext context = new RequestContext(new RequestHeader(ApiKeys.JOIN_GROUP, ApiKeys.JOIN_GROUP.latestVersion(), "client", 0), "1", InetAddress.getLoopbackAddress(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, null, false);
        CompletableFuture response = service.joinGroup(context, request, BufferSupplier.NO_CACHING);
        Assertions.assertTrue((boolean)response.isDone());
        JoinGroupResponseData expectedResponse = new JoinGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code()).setMemberId("");
        Assertions.assertEquals((Object)expectedResponse, response.get());
    }

    @Test
    public void testJoinGroupWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        JoinGroupRequestData request = new JoinGroupRequestData().setGroupId("foo");
        CompletableFuture future = service.joinGroup(TestUtil.requestContext(ApiKeys.JOIN_GROUP), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new JoinGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testSyncGroup() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        SyncGroupRequestData request = new SyncGroupRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"classic-group-sync"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new SyncGroupResponseData()));
        CompletableFuture responseFuture = service.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), request, BufferSupplier.NO_CACHING);
        Assertions.assertFalse((boolean)responseFuture.isDone());
    }

    @Test
    public void testSyncGroupWithException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        SyncGroupRequestData request = new SyncGroupRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"classic-group-sync"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new IllegalStateException()));
        CompletableFuture future = service.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), request, BufferSupplier.NO_CACHING);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)new SyncGroupResponseData().setErrorCode(Errors.UNKNOWN_SERVER_ERROR.code()), future.get());
    }

    @Test
    public void testSyncGroupInvalidGroupId() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        SyncGroupRequestData request = new SyncGroupRequestData().setGroupId(null).setMemberId("");
        CompletableFuture response = service.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), request, BufferSupplier.NO_CACHING);
        Assertions.assertTrue((boolean)response.isDone());
        SyncGroupResponseData expectedResponse = new SyncGroupResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code());
        Assertions.assertEquals((Object)expectedResponse, response.get());
    }

    @Test
    public void testSyncGroupWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        SyncGroupRequestData request = new SyncGroupRequestData().setGroupId("foo");
        CompletableFuture future = service.syncGroup(TestUtil.requestContext(ApiKeys.SYNC_GROUP), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new SyncGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testHeartbeat() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        HeartbeatRequestData request = new HeartbeatRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"classic-group-heartbeat"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new HeartbeatResponseData()));
        CompletableFuture future = service.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), request);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)new HeartbeatResponseData(), future.get());
    }

    @Test
    public void testHeartbeatCoordinatorNotAvailableException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        HeartbeatRequestData request = new HeartbeatRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"classic-group-heartbeat"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new CoordinatorLoadInProgressException(null)));
        CompletableFuture future = service.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), request);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)new HeartbeatResponseData(), future.get());
    }

    @Test
    public void testHeartbeatCoordinatorException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        HeartbeatRequestData request = new HeartbeatRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"classic-group-heartbeat"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new RebalanceInProgressException()));
        CompletableFuture future = service.heartbeat(TestUtil.requestContext(ApiKeys.HEARTBEAT), request);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)new HeartbeatResponseData().setErrorCode(Errors.REBALANCE_IN_PROGRESS.code()), future.get());
    }

    @Test
    public void testHeartbeatWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        HeartbeatRequestData request = new HeartbeatRequestData().setGroupId("foo");
        CompletableFuture future = service.heartbeat(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_HEARTBEAT), request);
        Assertions.assertEquals((Object)new HeartbeatResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testListGroups() throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        int partitionCount = 3;
        service.startup(() -> partitionCount);
        ListGroupsRequestData request = new ListGroupsRequestData();
        List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList(new ListGroupsResponseData.ListedGroup().setGroupId("group0").setProtocolType("protocol1").setGroupState("Stable").setGroupType("classic"), new ListGroupsResponseData.ListedGroup().setGroupId("group1").setProtocolType("consumer").setGroupState("Empty").setGroupType("consumer"), new ListGroupsResponseData.ListedGroup().setGroupId("group2").setProtocolType("consumer").setGroupState("Dead").setGroupType("consumer"));
        Mockito.when((Object)runtime.partitions()).thenReturn((Object)Sets.newSet((Object[])new TopicPartition[]{new TopicPartition("__consumer_offsets", 0), new TopicPartition("__consumer_offsets", 1), new TopicPartition("__consumer_offsets", 2)}));
        for (int i = 0; i < partitionCount; ++i) {
            Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"list-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", i)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
        }
        CompletableFuture responseFuture = service.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), request);
        List actualResults = ((ListGroupsResponseData)responseFuture.get(5L, TimeUnit.SECONDS)).groups();
        Assertions.assertEquals(expectedResults, (Object)actualResults);
    }

    @Test
    public void testListGroupsFailedWithNotCoordinatorException() throws InterruptedException, ExecutionException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        int partitionCount = 3;
        service.startup(() -> partitionCount);
        List<ListGroupsResponseData.ListedGroup> expectedResults = Arrays.asList(new ListGroupsResponseData.ListedGroup().setGroupId("group0").setProtocolType("protocol1").setGroupState("Stable").setGroupType("classic"), new ListGroupsResponseData.ListedGroup().setGroupId("group1").setProtocolType("consumer").setGroupState("Empty").setGroupType("consumer"));
        ListGroupsRequestData request = new ListGroupsRequestData();
        Mockito.when((Object)runtime.partitions()).thenReturn((Object)Sets.newSet((Object[])new TopicPartition[]{new TopicPartition("__consumer_offsets", 0), new TopicPartition("__consumer_offsets", 1), new TopicPartition("__consumer_offsets", 2)}));
        for (int i = 0; i < 2; ++i) {
            Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"list-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", i)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(expectedResults.get(i))));
        }
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"list-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 2)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new NotCoordinatorException("")));
        CompletableFuture responseFuture = service.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), request);
        List actualResults = ((ListGroupsResponseData)responseFuture.get(5L, TimeUnit.SECONDS)).groups();
        Assertions.assertEquals(expectedResults, (Object)actualResults);
    }

    @Test
    public void testListGroupsFailedImmediately() throws InterruptedException, ExecutionException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        int partitionCount = 3;
        service.startup(() -> partitionCount);
        ListGroupsRequestData request = new ListGroupsRequestData();
        Mockito.when((Object)runtime.partitions()).thenReturn((Object)Sets.newSet((Object[])new TopicPartition[]{new TopicPartition("__consumer_offsets", 0), new TopicPartition("__consumer_offsets", 1), new TopicPartition("__consumer_offsets", 2)}));
        for (int i = 0; i < 2; ++i) {
            Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"list-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", i)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.emptyList()));
        }
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"list-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 2)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new CoordinatorLoadInProgressException("")));
        CompletableFuture responseFuture = service.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), request);
        ListGroupsResponseData listGroupsResponseData = (ListGroupsResponseData)responseFuture.get(5L, TimeUnit.SECONDS);
        Assertions.assertEquals((short)Errors.COORDINATOR_LOAD_IN_PROGRESS.code(), (short)listGroupsResponseData.errorCode());
        Assertions.assertEquals(Collections.emptyList(), (Object)listGroupsResponseData.groups());
    }

    @Test
    public void testListGroupsWithEmptyTopicPartitions() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        int partitionCount = 0;
        service.startup(() -> partitionCount);
        ListGroupsRequestData request = new ListGroupsRequestData();
        CompletableFuture future = service.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), request);
        Assertions.assertEquals((Object)new ListGroupsResponseData(), future.get());
    }

    @Test
    public void testListGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        ListGroupsRequestData request = new ListGroupsRequestData();
        CompletableFuture future = service.listGroups(TestUtil.requestContext(ApiKeys.LIST_GROUPS), request);
        Assertions.assertEquals((Object)new ListGroupsResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testDescribeGroups() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        int partitionCount = 2;
        service.startup(() -> partitionCount);
        DescribeGroupsResponseData.DescribedGroup describedGroup1 = new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id-1");
        DescribeGroupsResponseData.DescribedGroup describedGroup2 = new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id-2");
        List<DescribeGroupsResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(describedGroup1, describedGroup2);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"describe-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
        CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<List<DescribeGroupsResponseData.DescribedGroup>>();
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"describe-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 1)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(describedGroupFuture);
        CompletableFuture future = service.describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Arrays.asList("group-id-1", "group-id-2"));
        Assertions.assertFalse((boolean)future.isDone());
        describedGroupFuture.complete(Collections.singletonList(describedGroup2));
        Assertions.assertEquals(expectedDescribedGroups, future.get());
    }

    @Test
    public void testDescribeGroupsInvalidGroupId() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        int partitionCount = 1;
        service.startup(() -> partitionCount);
        DescribeGroupsResponseData.DescribedGroup describedGroup = new DescribeGroupsResponseData.DescribedGroup().setGroupId("");
        List<DescribeGroupsResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(new DescribeGroupsResponseData.DescribedGroup().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()), describedGroup);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"describe-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
        CompletableFuture future = service.describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Arrays.asList("", null));
        Assertions.assertEquals(expectedDescribedGroups, future.get());
    }

    @Test
    public void testDescribeGroupCoordinatorLoadInProgress() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        int partitionCount = 1;
        service.startup(() -> partitionCount);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"describe-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new CoordinatorLoadInProgressException(null)));
        CompletableFuture future = service.describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Collections.singletonList("group-id"));
        Assertions.assertEquals(Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())), future.get());
    }

    @Test
    public void testDescribeGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        CompletableFuture future = service.describeGroups(TestUtil.requestContext(ApiKeys.DESCRIBE_GROUPS), Collections.singletonList("group-id"));
        Assertions.assertEquals(Collections.singletonList(new DescribeGroupsResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())), future.get());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFetchOffsets(boolean requireStable) throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0))));
        OffsetFetchResponseData.OffsetFetchResponseGroup response = new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions().setPartitionIndex(0).setCommittedOffset(100L)))));
        if (requireStable) {
            Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"fetch-offsets"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(response));
        } else {
            Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"fetch-offsets"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(response));
        }
        CompletableFuture future = service.fetchOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), request, requireStable);
        Assertions.assertEquals((Object)response, future.get(5L, TimeUnit.SECONDS));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFetchOffsetsWhenNotStarted(boolean requireStable) throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchRequestData.OffsetFetchRequestTopics().setName("foo").setPartitionIndexes(Collections.singletonList(0))));
        CompletableFuture future = service.fetchOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), request, requireStable);
        Assertions.assertEquals((Object)new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFetchAllOffsets(boolean requireStable) throws ExecutionException, InterruptedException, TimeoutException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group");
        OffsetFetchResponseData.OffsetFetchResponseGroup response = new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setTopics(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponseTopics().setName("foo").setPartitions(Collections.singletonList(new OffsetFetchResponseData.OffsetFetchResponsePartitions().setPartitionIndex(0).setCommittedOffset(100L)))));
        if (requireStable) {
            Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"fetch-all-offsets"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(response));
        } else {
            Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"fetch-all-offsets"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(response));
        }
        CompletableFuture future = service.fetchAllOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), request, requireStable);
        Assertions.assertEquals((Object)response, future.get(5L, TimeUnit.SECONDS));
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testFetchAllOffsetsWhenNotStarted(boolean requireStable) throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        OffsetFetchRequestData.OffsetFetchRequestGroup request = new OffsetFetchRequestData.OffsetFetchRequestGroup().setGroupId("group");
        CompletableFuture future = service.fetchAllOffsets(TestUtil.requestContext(ApiKeys.OFFSET_FETCH), request, requireStable);
        Assertions.assertEquals((Object)new OffsetFetchResponseData.OffsetFetchResponseGroup().setGroupId("group").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testLeaveGroup() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        LeaveGroupRequestData request = new LeaveGroupRequestData().setGroupId("foo");
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"classic-group-leave"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(new LeaveGroupResponseData()));
        CompletableFuture future = service.leaveGroup(TestUtil.requestContext(ApiKeys.LEAVE_GROUP), request);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)new LeaveGroupResponseData(), future.get());
    }

    @Test
    public void testLeaveGroupThrowsUnknownMemberIdException() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        LeaveGroupRequestData request = new LeaveGroupRequestData().setGroupId("foo").setMembers(Arrays.asList(new LeaveGroupRequestData.MemberIdentity().setMemberId("member-1").setGroupInstanceId("instance-1"), new LeaveGroupRequestData.MemberIdentity().setMemberId("member-2").setGroupInstanceId("instance-2")));
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"classic-group-leave"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new UnknownMemberIdException()));
        CompletableFuture future = service.leaveGroup(TestUtil.requestContext(ApiKeys.LEAVE_GROUP), request);
        Assertions.assertTrue((boolean)future.isDone());
        LeaveGroupResponseData expectedResponse = new LeaveGroupResponseData().setErrorCode(Errors.NONE.code()).setMembers(Arrays.asList(new LeaveGroupResponseData.MemberResponse().setMemberId("member-1").setGroupInstanceId("instance-1").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()), new LeaveGroupResponseData.MemberResponse().setMemberId("member-2").setGroupInstanceId("instance-2").setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())));
        Assertions.assertEquals((Object)expectedResponse, future.get());
    }

    @Test
    public void testLeaveGroupWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        LeaveGroupRequestData request = new LeaveGroupRequestData().setGroupId("foo");
        CompletableFuture future = service.leaveGroup(TestUtil.requestContext(ApiKeys.LEAVE_GROUP), request);
        Assertions.assertEquals((Object)new LeaveGroupResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testConsumerGroupDescribe() throws InterruptedException, ExecutionException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        int partitionCount = 2;
        service.startup(() -> partitionCount);
        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup1 = new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id-1");
        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup2 = new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id-2");
        List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(describedGroup1, describedGroup2);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"consumer-group-describe"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup1)));
        CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> describedGroupFuture = new CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>();
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"consumer-group-describe"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 1)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(describedGroupFuture);
        CompletableFuture future = service.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Arrays.asList("group-id-1", "group-id-2"));
        Assertions.assertFalse((boolean)future.isDone());
        describedGroupFuture.complete(Collections.singletonList(describedGroup2));
        Assertions.assertEquals(expectedDescribedGroups, future.get());
    }

    @Test
    public void testConsumerGroupDescribeInvalidGroupId() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        int partitionCount = 1;
        service.startup(() -> partitionCount);
        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code());
        List<ConsumerGroupDescribeResponseData.DescribedGroup> expectedDescribedGroups = Arrays.asList(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()), describedGroup);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"consumer-group-describe"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(Collections.singletonList(describedGroup)));
        CompletableFuture future = service.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Arrays.asList("", null));
        Assertions.assertEquals(expectedDescribedGroups, future.get());
    }

    @Test
    public void testConsumerGroupDescribeCoordinatorLoadInProgress() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        int partitionCount = 1;
        service.startup(() -> partitionCount);
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"consumer-group-describe"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new CoordinatorLoadInProgressException(null)));
        CompletableFuture future = service.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Collections.singletonList("group-id"));
        Assertions.assertEquals(Collections.singletonList(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code())), future.get());
    }

    @Test
    public void testConsumerGroupDescribeCoordinatorNotActive() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        Mockito.when((Object)runtime.scheduleReadOperation((String)ArgumentMatchers.eq((Object)"consumer-group-describe"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (CoordinatorRuntime.CoordinatorReadOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)Errors.COORDINATOR_NOT_AVAILABLE.exception()));
        CompletableFuture future = service.consumerGroupDescribe(TestUtil.requestContext(ApiKeys.CONSUMER_GROUP_DESCRIBE), Collections.singletonList("group-id"));
        Assertions.assertEquals(Collections.singletonList(new ConsumerGroupDescribeResponseData.DescribedGroup().setGroupId("group-id").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())), future.get());
    }

    @Test
    public void testDeleteOffsets() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        service.startup(() -> 1);
        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName("topic").setPartitions(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)))).iterator());
        OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group").setTopics(requestTopicCollection);
        OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection responsePartitionCollection = new OffsetDeleteResponseData.OffsetDeleteResponsePartitionCollection(Collections.singletonList(new OffsetDeleteResponseData.OffsetDeleteResponsePartition().setPartitionIndex(0)).iterator());
        OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection responseTopicCollection = new OffsetDeleteResponseData.OffsetDeleteResponseTopicCollection(Collections.singletonList(new OffsetDeleteResponseData.OffsetDeleteResponseTopic().setPartitions(responsePartitionCollection)).iterator());
        OffsetDeleteResponseData response = new OffsetDeleteResponseData().setTopics(responseTopicCollection);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"delete-offsets"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(response));
        CompletableFuture future = service.deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), request, BufferSupplier.NO_CACHING);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)response, future.get());
    }

    @Test
    public void testDeleteOffsetsInvalidGroupId() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        service.startup(() -> 1);
        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName("topic").setPartitions(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)))).iterator());
        OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("").setTopics(requestTopicCollection);
        OffsetDeleteResponseData response = new OffsetDeleteResponseData().setErrorCode(Errors.INVALID_GROUP_ID.code());
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"delete-offsets"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(response));
        CompletableFuture future = service.deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), request, BufferSupplier.NO_CACHING);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)response, future.get());
    }

    @ParameterizedTest
    @MethodSource(value={"testConsumerGroupHeartbeatWithExceptionSource"})
    public void testDeleteOffsetsWithException(Throwable exception, short expectedErrorCode) throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        service.startup(() -> 1);
        OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection requestTopicCollection = new OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName("topic").setPartitions(Collections.singletonList(new OffsetDeleteRequestData.OffsetDeleteRequestPartition().setPartitionIndex(0)))).iterator());
        OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("group").setTopics(requestTopicCollection);
        OffsetDeleteResponseData response = new OffsetDeleteResponseData().setErrorCode(expectedErrorCode);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"delete-offsets"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)exception));
        CompletableFuture future = service.deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), request, BufferSupplier.NO_CACHING);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)response, future.get());
    }

    @Test
    public void testDeleteOffsetsWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        OffsetDeleteRequestData request = new OffsetDeleteRequestData().setGroupId("foo");
        CompletableFuture future = service.deleteOffsets(TestUtil.requestContext(ApiKeys.OFFSET_DELETE), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new OffsetDeleteResponseData().setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()), future.get());
    }

    @Test
    public void testDeleteGroups() throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        service.startup(() -> 3);
        DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection1 = new DeleteGroupsResponseData.DeletableGroupResultCollection();
        DeleteGroupsResponseData.DeletableGroupResult result1 = new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-1");
        resultCollection1.add((ImplicitLinkedHashCollection.Element)result1);
        DeleteGroupsResponseData.DeletableGroupResultCollection resultCollection2 = new DeleteGroupsResponseData.DeletableGroupResultCollection();
        DeleteGroupsResponseData.DeletableGroupResult result2 = new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-2");
        resultCollection2.add((ImplicitLinkedHashCollection.Element)result2);
        DeleteGroupsResponseData.DeletableGroupResult result3 = new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id-3").setErrorCode(Errors.COORDINATOR_LOAD_IN_PROGRESS.code());
        DeleteGroupsResponseData.DeletableGroupResultCollection expectedResultCollection = new DeleteGroupsResponseData.DeletableGroupResultCollection();
        expectedResultCollection.addAll(Arrays.asList(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId(null).setErrorCode(Errors.INVALID_GROUP_ID.code()), result2.duplicate(), result3.duplicate(), result1.duplicate()));
        Mockito.when((Object)runtime.partitions()).thenReturn((Object)Sets.newSet((Object[])new TopicPartition[]{new TopicPartition("__consumer_offsets", 0), new TopicPartition("__consumer_offsets", 1), new TopicPartition("__consumer_offsets", 2)}));
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"delete-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 2)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(resultCollection1));
        CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection> resultCollectionFuture = new CompletableFuture<DeleteGroupsResponseData.DeletableGroupResultCollection>();
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"delete-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(resultCollectionFuture);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"delete-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 1)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)Errors.COORDINATOR_LOAD_IN_PROGRESS.exception()));
        List<String> groupIds = Arrays.asList("group-id-1", "group-id-2", "group-id-3", null);
        CompletableFuture future = service.deleteGroups(TestUtil.requestContext(ApiKeys.DELETE_GROUPS), groupIds, BufferSupplier.NO_CACHING);
        Assertions.assertFalse((boolean)future.isDone());
        resultCollectionFuture.complete(resultCollection2);
        Assertions.assertTrue((boolean)future.isDone());
        Assertions.assertEquals((Object)expectedResultCollection, future.get());
    }

    @ParameterizedTest
    @MethodSource(value={"testConsumerGroupHeartbeatWithExceptionSource"})
    public void testDeleteGroupsWithException(Throwable exception, short expectedErrorCode) throws Exception {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"delete-groups"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)exception));
        CompletableFuture future = service.deleteGroups(TestUtil.requestContext(ApiKeys.DELETE_GROUPS), Collections.singletonList("group-id"), BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new DeleteGroupsResponseData.DeletableGroupResultCollection(Collections.singletonList(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("group-id").setErrorCode(expectedErrorCode)).iterator()), future.get());
    }

    @Test
    public void testDeleteGroupsWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, (GroupCoordinatorMetrics)Mockito.mock(GroupCoordinatorMetrics.class));
        CompletableFuture future = service.deleteGroups(TestUtil.requestContext(ApiKeys.DELETE_GROUPS), Collections.singletonList("foo"), BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new DeleteGroupsResponseData.DeletableGroupResultCollection(Collections.singletonList(new DeleteGroupsResponseData.DeletableGroupResult().setGroupId("foo").setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code())).iterator()), future.get());
    }

    @Test
    public void testCommitTransactionalOffsetsWhenNotStarted() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData().setGroupId("foo").setTransactionalId("transactional-id").setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        CompletableFuture future = service.commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code()))))), future.get());
    }

    @ParameterizedTest
    @NullSource
    @ValueSource(strings={""})
    public void testCommitTransactionalOffsetsWithInvalidGroupId(String groupId) throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData().setGroupId(groupId).setTransactionalId("transactional-id").setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        CompletableFuture future = service.commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.INVALID_GROUP_ID.code()))))), future.get());
    }

    @Test
    public void testCommitTransactionalOffsets() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData().setGroupId("foo").setTransactionalId("transactional-id").setProducerId(10L).setProducerEpoch((short)5).setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(Errors.NONE.code())))));
        Mockito.when((Object)runtime.scheduleTransactionalWriteOperation((String)ArgumentMatchers.eq((Object)"txn-commit-offset"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (String)ArgumentMatchers.eq((Object)"transactional-id"), ArgumentMatchers.eq((long)10L), ArgumentMatchers.eq((short)5), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn(CompletableFuture.completedFuture(response));
        CompletableFuture future = service.commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)response, future.get());
    }

    @ParameterizedTest
    @CsvSource(value={"NOT_ENOUGH_REPLICAS, COORDINATOR_NOT_AVAILABLE", "NETWORK_EXCEPTION, COORDINATOR_LOAD_IN_PROGRESS"})
    public void testCommitTransactionalOffsetsWithWrappedError(Errors error, Errors expectedError) throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        TxnOffsetCommitRequestData request = new TxnOffsetCommitRequestData().setGroupId("foo").setTransactionalId("transactional-id").setProducerId(10L).setProducerEpoch((short)5).setMemberId("member-id").setGenerationId(10).setTopics(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitRequestData.TxnOffsetCommitRequestPartition().setPartitionIndex(0).setCommittedOffset(100L)))));
        TxnOffsetCommitResponseData response = new TxnOffsetCommitResponseData().setTopics(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponseTopic().setName("topic").setPartitions(Collections.singletonList(new TxnOffsetCommitResponseData.TxnOffsetCommitResponsePartition().setPartitionIndex(0).setErrorCode(expectedError.code())))));
        Mockito.when((Object)runtime.scheduleTransactionalWriteOperation((String)ArgumentMatchers.eq((Object)"txn-commit-offset"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), (String)ArgumentMatchers.eq((Object)"transactional-id"), ArgumentMatchers.eq((long)10L), ArgumentMatchers.eq((short)5), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenReturn((Object)FutureUtils.failedFuture((Throwable)new CompletionException((Throwable)error.exception())));
        CompletableFuture future = service.commitTransactionalOffsets(TestUtil.requestContext(ApiKeys.TXN_OFFSET_COMMIT), request, BufferSupplier.NO_CACHING);
        Assertions.assertEquals((Object)response, future.get());
    }

    @Test
    public void testCompleteTransaction() throws ExecutionException, InterruptedException {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        Mockito.when((Object)runtime.scheduleTransactionCompletion((String)ArgumentMatchers.eq((Object)"write-txn-marker"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", 0)), ArgumentMatchers.eq((long)100L), ArgumentMatchers.eq((short)5), ArgumentMatchers.eq((int)10), (TransactionResult)ArgumentMatchers.eq((Object)TransactionResult.COMMIT), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(100L)))).thenReturn(CompletableFuture.completedFuture(null));
        CompletableFuture future = service.completeTransaction(new TopicPartition("__consumer_offsets", 0), 100L, (short)5, 10, TransactionResult.COMMIT, Duration.ofMillis(100L));
        Assertions.assertNull(future.get());
    }

    @Test
    public void testCompleteTransactionWhenNotCoordinatorServiceStarted() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        CompletableFuture future = service.completeTransaction(new TopicPartition("foo", 0), 100L, (short)5, 10, TransactionResult.COMMIT, Duration.ofMillis(100L));
        TestUtils.assertFutureThrows((Future)future, CoordinatorNotAvailableException.class);
    }

    @Test
    public void testCompleteTransactionWithUnexpectedPartition() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> 1);
        CompletableFuture future = service.completeTransaction(new TopicPartition("foo", 0), 100L, (short)5, 10, TransactionResult.COMMIT, Duration.ofMillis(100L));
        TestUtils.assertFutureThrows((Future)future, IllegalStateException.class);
    }

    @Test
    public void testOnPartitionsDeleted() {
        int partitionCount = 3;
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        service.startup(() -> partitionCount);
        Mockito.when((Object)runtime.partitions()).thenReturn(IntStream.range(0, partitionCount).mapToObj(i -> new TopicPartition("__consumer_offsets", i)).collect(Collectors.toSet()));
        List futures = IntStream.range(0, partitionCount).mapToObj(__ -> new CompletableFuture()).collect(Collectors.toList());
        IntStream.range(0, partitionCount).forEach(i -> {
            CompletableFuture future = (CompletableFuture)futures.get(i);
            Mockito.when((Object)runtime.scheduleWriteOperation((String)ArgumentMatchers.eq((Object)"on-partition-deleted"), (TopicPartition)ArgumentMatchers.eq((Object)new TopicPartition("__consumer_offsets", i)), (Duration)ArgumentMatchers.eq((Object)Duration.ofMillis(5000L)), (CoordinatorRuntime.CoordinatorWriteOperation)ArgumentMatchers.any())).thenAnswer(__ -> future);
        });
        IntStream.range(0, partitionCount - 1).forEach(i -> ((CompletableFuture)futures.get(i)).complete(null));
        ((CompletableFuture)futures.get(partitionCount - 1)).completeExceptionally((Throwable)Errors.COORDINATOR_LOAD_IN_PROGRESS.exception());
        Assertions.assertDoesNotThrow(() -> service.onPartitionsDeleted(Collections.singletonList(new TopicPartition("foo", 0)), BufferSupplier.NO_CACHING));
    }

    @Test
    public void testOnPartitionsDeletedWhenServiceIsNotStarted() {
        CoordinatorRuntime<GroupCoordinatorShard, Record> runtime = this.mockRuntime();
        GroupCoordinatorService service = new GroupCoordinatorService(new LogContext(), this.createConfig(), runtime, new GroupCoordinatorMetrics());
        Assertions.assertThrows(CoordinatorNotAvailableException.class, () -> service.onPartitionsDeleted(Collections.singletonList(new TopicPartition("foo", 0)), BufferSupplier.NO_CACHING));
    }
}

