/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.consumer.group;

import java.lang.invoke.CallSite;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import joptsimple.OptionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RoundRobinAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.GroupIdNotFoundException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommand;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandOptions;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTestUtils;
import org.apache.kafka.tools.consumer.group.GroupInformation;
import org.apache.kafka.tools.consumer.group.MemberAssignmentState;
import org.apache.kafka.tools.consumer.group.PartitionAssignmentState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@ClusterTestDefaults(types={Type.CO_KRAFT}, serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="group.initial.rebalance.delay.ms", value="1000"), @ClusterConfigProperty(key="group.consumer.heartbeat.interval.ms", value="500"), @ClusterConfigProperty(key="group.consumer.min.heartbeat.interval.ms", value="500")})
public class DescribeConsumerGroupTest {
    private static final String TOPIC_PREFIX = "test.topic.";
    private static final String GROUP_PREFIX = "test.group.";
    private static final List<List<String>> DESCRIBE_TYPE_OFFSETS = List.of(List.of(""), List.of("--offsets"), List.of("--offsets", "--verbose"));
    private static final List<List<String>> DESCRIBE_TYPE_MEMBERS = List.of(List.of("--members"), List.of("--members", "--verbose"));
    private static final List<List<String>> DESCRIBE_TYPE_STATE = List.of(List.of("--state"), List.of("--state", "--verbose"));
    private static final List<List<String>> DESCRIBE_TYPES = Stream.of(DESCRIBE_TYPE_OFFSETS, DESCRIBE_TYPE_MEMBERS, DESCRIBE_TYPE_STATE).flatMap(Collection::stream).toList();
    private ClusterInstance clusterInstance;

    @ClusterTest
    public void testDescribeNonExistingGroup(ClusterInstance clusterInstance) {
        String missingGroup = "missing.group";
        for (List<String> describeType : DESCRIBE_TYPES) {
            ArrayList<String> cgcArgs = new ArrayList<String>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup));
            cgcArgs.addAll(describeType);
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                try {
                    service.describeGroups();
                    Assertions.fail((String)("Expected error was not detected for describe option '" + String.join((CharSequence)" ", describeType) + "'"));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            catch (ExecutionException ee) {
                Assertions.assertInstanceOf(GroupIdNotFoundException.class, (Object)ee.getCause());
                Assertions.assertEquals((Object)("Group " + missingGroup + " not found."), (Object)ee.getCause().getMessage());
            }
            catch (Exception e) {
                Assertions.fail((String)("Expected error was not detected for describe option '" + String.join((CharSequence)" ", describeType) + "'"));
            }
        }
    }

    @ClusterTest
    public void testDescribeOffsetsOfNonExistingGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        String missingGroup = "missing.group";
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            try {
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup});
                    try {
                        service.collectGroupOffsets(missingGroup);
                        Assertions.fail((String)("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
            catch (ExecutionException ee) {
                Assertions.assertInstanceOf(GroupIdNotFoundException.class, (Object)ee.getCause(), (String)("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"));
            }
        }
    }

    @ClusterTest
    public void testDescribeMembersOfNonExistingGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        String missingGroup = "missing.group";
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            try {
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup});
                    try {
                        service.collectGroupMembers(missingGroup);
                        Assertions.fail((String)("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
            catch (ExecutionException ee) {
                Assertions.assertInstanceOf(GroupIdNotFoundException.class, (Object)ee.getCause(), (String)("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"));
            }
        }
    }

    @ClusterTest
    public void testDescribeStateOfNonExistingGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        String missingGroup = "missing.group";
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            try {
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", missingGroup});
                    try {
                        service.collectGroupState(missingGroup);
                        Assertions.fail((String)("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
            catch (ExecutionException ee) {
                Assertions.assertInstanceOf(GroupIdNotFoundException.class, (Object)ee.getCause(), (String)("Expected the group '" + missingGroup + "' to throw GroupIdNotFoundException"));
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupOffsets(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            clusterInstance.createTopic(topic, 1, (short)1);
            this.sendRecords(topic, 0, 1);
            for (List<String> describeType : DESCRIBE_TYPE_OFFSETS) {
                String group = GROUP_PREFIX + groupProtocol.name() + "." + String.join((CharSequence)"", describeType);
                ArrayList<CallSite> cgcArgs = new ArrayList<CallSite>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group));
                cgcArgs.addAll(describeType);
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Map.of());
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        Admin admin = clusterInstance.admin();
                        try {
                            TestUtils.waitForCondition(() -> {
                                Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                                String[] lines = res.getKey().trim().split("\n");
                                if (lines.length != 2 && !res.getValue().isEmpty()) {
                                    return false;
                                }
                                ConsumerGroupDescription consumerGroupDescription = (ConsumerGroupDescription)((KafkaFuture)admin.describeConsumerGroups(Set.of(group)).describedGroups().get(group)).get();
                                MemberDescription memberDescription = (MemberDescription)consumerGroupDescription.members().iterator().next();
                                List<String> expectedValues = describeType.contains("--verbose") ? List.of(group, topic, "0", "-", "1", "1", "0", memberDescription.consumerId(), memberDescription.host(), memberDescription.clientId()) : List.of(group, topic, "0", "1", "1", "0", memberDescription.consumerId(), memberDescription.host(), memberDescription.clientId());
                                return this.checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
                            }, (String)("Expected a data row and no error in describe results with describe type " + String.join((CharSequence)" ", describeType) + "."));
                        }
                        finally {
                            if (admin == null) continue;
                            admin.close();
                        }
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupMembers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            boolean isConsumer = groupProtocol.equals((Object)GroupProtocol.CONSUMER);
            String topic1 = TOPIC_PREFIX + groupProtocol.name() + "1";
            String topic2 = TOPIC_PREFIX + groupProtocol.name() + "2";
            clusterInstance.createTopic(topic1, 2, (short)1);
            clusterInstance.createTopic(topic2, 1, (short)1);
            for (List<String> describeType : DESCRIBE_TYPE_MEMBERS) {
                String group = GROUP_PREFIX + groupProtocol.name() + "." + String.join((CharSequence)"", describeType);
                ArrayList<CallSite> cgcArgs = new ArrayList<CallSite>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group));
                cgcArgs.addAll(describeType);
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, Set.of(topic1, topic2), Map.of(), 1);
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        Admin admin = clusterInstance.admin();
                        try {
                            TestUtils.waitForCondition(() -> {
                                Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                                String[] lines = res.getKey().trim().split("\n");
                                if (lines.length != 2 && !res.getValue().isEmpty()) {
                                    return false;
                                }
                                ConsumerGroupDescription consumerGroupDescription = (ConsumerGroupDescription)((KafkaFuture)admin.describeConsumerGroups(Set.of(group)).describedGroups().get(group)).get();
                                MemberDescription memberDescription = (MemberDescription)consumerGroupDescription.members().iterator().next();
                                String topicAssignment = topic1 + ":0,1;" + topic2 + ":0";
                                List<Object> expectedValues = describeType.contains("--verbose") ? List.of(group, memberDescription.consumerId(), memberDescription.host(), memberDescription.clientId(), "3", isConsumer ? ((Integer)memberDescription.memberEpoch().get()).toString() : "-", topicAssignment, isConsumer ? ((Integer)consumerGroupDescription.targetAssignmentEpoch().get()).toString() : "-", isConsumer ? topicAssignment : "-") : List.of(group, memberDescription.consumerId(), memberDescription.host(), memberDescription.clientId(), "3");
                                return this.checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
                            }, (String)("Expected a data row and no error in describe results with describe type " + String.join((CharSequence)" ", describeType) + "."));
                        }
                        finally {
                            if (admin == null) continue;
                            admin.close();
                        }
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupMemberWithMigration(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        String topic = "test.topic.migration";
        String group = "test.group.migration";
        String classicClientId = "classic";
        String consumerClientId = "consumer";
        clusterInstance.createTopic(topic, 2, (short)1);
        List<String> cgcArgs = List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--members", "--verbose");
        try (AutoCloseable classicConsumer = this.consumerGroupClosable(GroupProtocol.CLASSIC, group, topic, Map.of("client.id", classicClientId));
             Admin admin = clusterInstance.admin();){
            TestUtils.waitForCondition(() -> {
                ConsumerGroupDescription consumerGroupDescription = (ConsumerGroupDescription)((KafkaFuture)admin.describeConsumerGroups(Set.of(group)).describedGroups().get(group)).get();
                MemberDescription memberDescription = (MemberDescription)consumerGroupDescription.members().iterator().next();
                return !memberDescription.assignment().topicPartitions().isEmpty();
            }, (String)"Expected the classic consumer to join the group.");
            try (AutoCloseable consumerConsumer = this.consumerGroupClosable(GroupProtocol.CONSUMER, group, topic, Map.of("client.id", consumerClientId));
                 ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));){
                TestUtils.waitForCondition(() -> {
                    Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                    String[] lines = res.getKey().trim().split("\n");
                    if (lines.length != 3 && !res.getValue().isEmpty()) {
                        return false;
                    }
                    String[] header = lines[0].trim().split("\\s+");
                    Assertions.assertEquals((Object)"UPGRADED", (Object)header[header.length - 1]);
                    List<String> line1 = Arrays.stream(lines[1].trim().split("\\s+")).toList();
                    List<String> line2 = Arrays.stream(lines[2].trim().split("\\s+")).toList();
                    if (line1.contains(classicClientId)) {
                        Assertions.assertEquals((Object)"false", (Object)line1.get(line1.size() - 1));
                        Assertions.assertEquals((Object)"true", (Object)line2.get(line2.size() - 1));
                    } else {
                        Assertions.assertEquals((Object)"false", (Object)line2.get(line2.size() - 1));
                        Assertions.assertEquals((Object)"true", (Object)line1.get(line1.size() - 1));
                    }
                    return true;
                }, (String)"Expected a data row and no error in describe results with describe type \"--members --verbose\"");
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupState(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            boolean isConsumer = groupProtocol.equals((Object)GroupProtocol.CONSUMER);
            String topic = TOPIC_PREFIX + groupProtocol.name();
            clusterInstance.createTopic(topic, 1, (short)1);
            for (List<String> describeType : DESCRIBE_TYPE_STATE) {
                String group = GROUP_PREFIX + groupProtocol.name() + "." + String.join((CharSequence)"", describeType);
                ArrayList<CallSite> cgcArgs = new ArrayList<CallSite>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group));
                cgcArgs.addAll(describeType);
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Map.of());
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        Admin admin = clusterInstance.admin();
                        try {
                            TestUtils.waitForCondition(() -> {
                                Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                                String[] lines = res.getKey().trim().split("\n");
                                if (lines.length != 2 && !res.getValue().isEmpty()) {
                                    return false;
                                }
                                ConsumerGroupDescription consumerGroupDescription = (ConsumerGroupDescription)((KafkaFuture)admin.describeConsumerGroups(Set.of(group)).describedGroups().get(group)).get();
                                String coordinatorAddress = consumerGroupDescription.coordinator().host() + ":" + consumerGroupDescription.coordinator().port();
                                String coordinatorId = "(" + consumerGroupDescription.coordinator().idString() + ")";
                                List<String> expectedValues = describeType.contains("--verbose") ? List.of(group, coordinatorAddress, coordinatorId, consumerGroupDescription.partitionAssignor(), GroupState.STABLE.toString(), isConsumer ? ((Integer)consumerGroupDescription.groupEpoch().get()).toString() : "-", isConsumer ? ((Integer)consumerGroupDescription.targetAssignmentEpoch().get()).toString() : "-", "1") : List.of(group, coordinatorAddress, coordinatorId, consumerGroupDescription.partitionAssignor(), GroupState.STABLE.toString(), "1");
                                return this.checkArgsHeaderOutput(cgcArgs, lines[0]) && Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues);
                            }, (String)("Expected two data rows and no error in describe results with describe type " + String.join((CharSequence)" ", describeType) + "."));
                        }
                        finally {
                            if (admin == null) continue;
                            admin.close();
                        }
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ClusterTest
    public void testDescribeExistingGroups(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            ArrayList<AutoCloseable> protocolConsumerGroupExecutors = new ArrayList<AutoCloseable>();
            try {
                ArrayList<String> groups = new ArrayList<String>();
                for (List<String> describeType : DESCRIBE_TYPES) {
                    String group = GROUP_PREFIX + groupProtocol.name() + "." + String.join((CharSequence)"", describeType);
                    groups.addAll(Arrays.asList("--group", group));
                    protocolConsumerGroupExecutors.add(this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()));
                }
                int expectedNumLines = DESCRIBE_TYPES.size() * 2;
                for (List<String> describeType : DESCRIBE_TYPES) {
                    ArrayList<String> cgcArgs = new ArrayList<String>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe"));
                    cgcArgs.addAll(groups);
                    cgcArgs.addAll(describeType);
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        TestUtils.waitForCondition(() -> {
                            Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                            long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count();
                            return numLines == (long)expectedNumLines && res.getValue().isEmpty() && this.checkArgsHeaderOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
                        }, (String)("Expected a data row and no error in describe results with describe type " + String.join((CharSequence)" ", describeType) + "."));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
            }
            finally {
                for (AutoCloseable protocolConsumerGroupExecutor : protocolConsumerGroupExecutors) {
                    protocolConsumerGroupExecutor.close();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ClusterTest
    public void testDescribeAllExistingGroups(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            ArrayList<AutoCloseable> protocolConsumerGroupExecutors = new ArrayList<AutoCloseable>();
            ArrayList<String> groups = new ArrayList<String>();
            try {
                for (List<String> list : DESCRIBE_TYPES) {
                    String group = GROUP_PREFIX + groupProtocol.name() + "." + String.join((CharSequence)"", list);
                    groups.add(group);
                    protocolConsumerGroupExecutors.add(this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap()));
                }
                int expectedNumLines = DESCRIBE_TYPES.size() * 2;
                for (List<String> describeType3 : DESCRIBE_TYPES) {
                    ArrayList<String> cgcArgs = new ArrayList<String>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--all-groups"));
                    cgcArgs.addAll(describeType3);
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        TestUtils.waitForCondition(() -> {
                            Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                            long numLines = Arrays.stream(res.getKey().trim().split("\n")).filter(line -> !line.isEmpty()).count();
                            return numLines == (long)expectedNumLines && res.getValue().isEmpty() && this.checkArgsHeaderOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
                        }, (String)("Expected a data row and no error in describe results with describe type " + String.join((CharSequence)" ", describeType3) + "."));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
            }
            finally {
                for (AutoCloseable autoCloseable : protocolConsumerGroupExecutors) {
                    autoCloseable.close();
                }
                this.deleteConsumerGroups(groups);
                this.deleteTopic(topic);
            }
        }
    }

    @ClusterTest
    public void testDescribeOffsetsOfExistingGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        boolean res;
                        Map.Entry groupOffsets = service.collectGroupOffsets(group);
                        Optional state = (Optional)groupOffsets.getKey();
                        Optional assignments = (Optional)groupOffsets.getValue();
                        Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, group);
                        boolean bl = res = state.map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && assignments.isPresent() && ((Collection)assignments.get()).stream().filter(isGrp).count() == 1L;
                        if (!res) {
                            return false;
                        }
                        Optional<PartitionAssignmentState> maybePartitionState = ((Collection)assignments.get()).stream().filter(isGrp).findFirst();
                        if (maybePartitionState.isEmpty()) {
                            return false;
                        }
                        PartitionAssignmentState partitionState = maybePartitionState.get();
                        return partitionState.consumerId.map(s0 -> s0.trim().equals("-")).orElse(false) == false && partitionState.clientId.map(s0 -> s0.trim().equals("-")).orElse(false) == false && partitionState.host.map(h -> h.trim().equals("-")).orElse(false) == false;
                    }, (String)("Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for group " + group + "."));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeMembersOfExistingGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", clusterInstance.bootstrapServers()));
                    try {
                        TestUtils.waitForCondition(() -> {
                            ConsumerGroupDescription consumerGroupDescription = (ConsumerGroupDescription)((KafkaFuture)admin.describeConsumerGroups(Collections.singleton(group)).describedGroups().get(group)).get();
                            return consumerGroupDescription.members().size() == 1 && ((MemberDescription)consumerGroupDescription.members().iterator().next()).assignment().topicPartitions().size() == 1;
                        }, (String)("Expected a 'Stable' group status, rows and valid member information for group " + group + "."));
                        Map.Entry res = service.collectGroupMembers(group);
                        Assertions.assertTrue((boolean)((Optional)res.getValue()).isPresent());
                        Assertions.assertTrue((((Collection)((Optional)res.getValue()).get()).size() == 1 && ((MemberAssignmentState)((Collection)((Optional)res.getValue()).get()).iterator().next()).assignment.size() == 1 ? 1 : 0) != 0, (String)("Expected a topic partition assigned to the single group member for group " + group));
                    }
                    finally {
                        if (admin == null) continue;
                        admin.close();
                    }
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeStateOfExistingGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap("group.remote.assignor", groupProtocol == GroupProtocol.CONSUMER ? "range" : ""));
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        GroupInformation state = service.collectGroupState(group);
                        return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 1 && state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id());
                    }, (String)("Expected a 'Stable' group status, with one member for group " + group + "."));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @ClusterTest
    public void testDescribeStateOfExistingGroupWithNonDefaultAssignor(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = null;
            try {
                String expectedName;
                if (groupProtocol.equals((Object)GroupProtocol.CONSUMER)) {
                    protocolConsumerGroupExecutor = this.consumerGroupClosable(GroupProtocol.CONSUMER, group, topic, Collections.singletonMap("group.remote.assignor", "range"));
                    expectedName = "range";
                } else {
                    protocolConsumerGroupExecutor = this.consumerGroupClosable(GroupProtocol.CLASSIC, group, topic, Collections.singletonMap("partition.assignment.strategy", RoundRobinAssignor.class.getName()));
                    expectedName = "roundrobin";
                }
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        GroupInformation state = service.collectGroupState(group);
                        return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 1 && Objects.equals(state.assignmentStrategy, expectedName) && state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id());
                    }, (String)("Expected a 'Stable' group status, with one member and " + expectedName + " assignment strategy for group " + group + "."));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            for (List<String> describeType : DESCRIBE_TYPES) {
                String group = GROUP_PREFIX + groupProtocol.name() + String.join((CharSequence)"", describeType);
                ArrayList<CallSite> cgcArgs = new ArrayList<CallSite>(List.of("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group));
                cgcArgs.addAll(describeType);
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        TestUtils.waitForCondition(() -> {
                            Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                            return res.getKey().trim().split("\n").length == 2 && res.getValue().isEmpty() && this.checkArgsHeaderOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
                        }, (String)("Expected describe group results with one data row for describe type '" + String.join((CharSequence)" ", describeType) + "'"));
                        protocolConsumerGroupExecutor.close();
                        TestUtils.waitForCondition(() -> ToolsTestUtils.grabConsoleError(this.describeGroups(service)).contains("Consumer group '" + group + "' has no active members."), (String)("Expected no active member in describe group results with describe type " + String.join((CharSequence)" ", describeType)));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
        }
    }

    @ClusterTest
    public void testDescribeOffsetsOfExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupOffsets(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)res.getValue()).map(c -> c.stream().anyMatch(assignment -> Objects.equals(assignment.group, group) && assignment.offset.isPresent())).orElse(false) != false;
                    }, (String)"Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
                    protocolConsumerGroupExecutor.close();
                    TestUtils.waitForCondition(() -> {
                        Map.Entry offsets = service.collectGroupOffsets(group);
                        Optional state = (Optional)offsets.getKey();
                        Optional assignments = (Optional)offsets.getValue();
                        List testGroupAssignments = ((Collection)assignments.get()).stream().filter(a -> Objects.equals(a.group, group)).collect(Collectors.toList());
                        PartitionAssignmentState assignment = (PartitionAssignmentState)testGroupAssignments.get(0);
                        return state.map(s -> s.equals((Object)GroupState.EMPTY)).orElse(false) != false && testGroupAssignments.size() == 1 && assignment.consumerId.map(c -> c.trim().equals("-")).orElse(false) != false && assignment.clientId.map(c -> c.trim().equals("-")).orElse(false) != false && assignment.host.map(c -> c.trim().equals("-")).orElse(false) != false;
                    }, (String)"failed to collect group offsets");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeMembersOfExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupMembers(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)res.getValue()).map(c -> c.stream().anyMatch(m -> Objects.equals(m.group, group))).orElse(false) != false;
                    }, (String)"Expected the group to initially become stable, and to find group in assignments after initial offset commit.");
                    protocolConsumerGroupExecutor.close();
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupMembers(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.EMPTY)).orElse(false) != false && ((Optional)res.getValue()).isPresent() && ((Collection)((Optional)res.getValue()).get()).isEmpty();
                    }, (String)("Expected no member in describe group members results for group '" + group + "'"));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeStateOfExistingGroupWithNoMembers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        GroupInformation state = service.collectGroupState(group);
                        return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 1 && state.coordinator != null && clusterInstance.brokerIds().contains(state.coordinator.id());
                    }, (String)"Expected the group to initially become stable, and have a single member.");
                    protocolConsumerGroupExecutor.close();
                    TestUtils.waitForCondition(() -> {
                        GroupInformation state = service.collectGroupState(group);
                        return Objects.equals(state.groupState, GroupState.EMPTY) && state.numMembers == 0;
                    }, (String)"Expected the group to become empty after the only member leaving.");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            for (List<String> describeType : DESCRIBE_TYPES) {
                String group = GROUP_PREFIX + groupProtocol.name() + String.join((CharSequence)"", describeType);
                ArrayList<String> cgcArgs = new ArrayList<String>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group));
                cgcArgs.addAll(describeType);
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        TestUtils.waitForCondition(() -> {
                            Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                            int expectedNumRows = DESCRIBE_TYPE_MEMBERS.contains(describeType) ? 3 : 2;
                            return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows && this.checkArgsHeaderOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
                        }, (String)("Expected a single data row in describe group result with describe type '" + String.join((CharSequence)" ", describeType) + "'"));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
        }
    }

    @ClusterTest
    public void testDescribeOffsetsWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupOffsets(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).isPresent() && ((Optional)res.getValue()).isPresent() && ((Collection)((Optional)res.getValue()).get()).stream().filter(s -> Objects.equals(s.group, group)).count() == 1L && ((Collection)((Optional)res.getValue()).get()).stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 1L;
                    }, (String)"Expected rows for consumers with no assigned partitions in describe group results");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeMembersWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupMembers(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)res.getValue()).isPresent() && ((Collection)((Optional)res.getValue()).get()).stream().filter(s -> Objects.equals(s.group, group)).count() == 2L && ((Collection)((Optional)res.getValue()).get()).stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 1L && ((Collection)((Optional)res.getValue()).get()).stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 0).count() == 1L && ((Collection)((Optional)res.getValue()).get()).stream().anyMatch(s -> !s.assignment.isEmpty());
                    }, (String)"Expected rows for consumers with no assigned partitions in describe group results");
                    Map.Entry res = service.collectGroupMembers(group);
                    Assertions.assertTrue((((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)res.getValue()).map(c -> c.stream().anyMatch(s -> !s.assignment.isEmpty())).orElse(false) != false ? 1 : 0) != 0, (String)"Expected additional columns in verbose version of describe members");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeStateWithConsumersWithoutAssignedPartitions(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        GroupInformation state = service.collectGroupState(group);
                        return Objects.equals(state.groupState, GroupState.STABLE) && state.numMembers == 2;
                    }, (String)"Expected two consumers in describe group results");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            this.createTopic(topic, 2);
            for (List<String> describeType : DESCRIBE_TYPES) {
                String group = GROUP_PREFIX + groupProtocol.name() + String.join((CharSequence)"", describeType);
                ArrayList<String> cgcArgs = new ArrayList<String>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group));
                cgcArgs.addAll(describeType);
                AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
                try {
                    ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                    try {
                        TestUtils.waitForCondition(() -> {
                            Map.Entry<String, String> res = ToolsTestUtils.grabConsoleOutputAndError(this.describeGroups(service));
                            int expectedNumRows = DESCRIBE_TYPE_STATE.contains(describeType) ? 2 : 3;
                            return res.getValue().isEmpty() && res.getKey().trim().split("\n").length == expectedNumRows && this.checkArgsHeaderOutput(cgcArgs, res.getKey().trim().split("\n")[0]);
                        }, (String)("Expected a single data row in describe group result with describe type '" + String.join((CharSequence)" ", describeType) + "'"));
                    }
                    finally {
                        if (service == null) continue;
                        service.close();
                    }
                }
                finally {
                    if (protocolConsumerGroupExecutor == null) continue;
                    protocolConsumerGroupExecutor.close();
                }
            }
        }
    }

    @ClusterTest
    public void testDescribeOffsetsWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic, 2);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupOffsets(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)res.getValue()).isPresent() && ((Collection)((Optional)res.getValue()).get()).stream().filter(s -> Objects.equals(s.group, group)).count() == 2L && ((Collection)((Optional)res.getValue()).get()).stream().filter(x -> Objects.equals(x.group, group) && x.partition.isPresent()).count() == 2L && ((Collection)((Optional)res.getValue()).get()).stream().noneMatch(x -> Objects.equals(x.group, group) && x.partition.isEmpty());
                    }, (String)"Expected two rows (one row per consumer) in describe group results.");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeMembersWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic, 2);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupMembers(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)res.getValue()).isPresent() && ((Collection)((Optional)res.getValue()).get()).stream().filter(s -> Objects.equals(s.group, group)).count() == 2L && ((Collection)((Optional)res.getValue()).get()).stream().filter(x -> Objects.equals(x.group, group) && x.numPartitions == 1).count() == 2L && ((Collection)((Optional)res.getValue()).get()).stream().noneMatch(x -> Objects.equals(x.group, group) && x.numPartitions == 0);
                    }, (String)"Expected two rows (one row per consumer) in describe group members results.");
                    Map.Entry res = service.collectGroupMembers(group);
                    Assertions.assertTrue((((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)res.getValue()).map(s -> s.stream().filter(x -> x.assignment.isEmpty()).count()).orElse(0L) == 0L ? 1 : 0) != 0, (String)"Expected additional columns in verbose version of describe members");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeStateWithMultiPartitionTopicAndMultipleConsumers(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic, 2);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap(), 2);
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        GroupInformation state = service.collectGroupState(group);
                        return Objects.equals(state.groupState, GroupState.STABLE) && Objects.equals(state.group, group) && state.numMembers == 2;
                    }, (String)"Expected a stable group with two members in describe group state result.");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeSimpleConsumerGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic, 2);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(GroupProtocol.CLASSIC, group, new HashSet<TopicPartition>(Arrays.asList(new TopicPartition(topic, 0), new TopicPartition(topic, 1))), Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        Map.Entry res = service.collectGroupOffsets(group);
                        return ((Optional)res.getKey()).map(s -> s.equals((Object)GroupState.EMPTY)).orElse(false) != false && ((Optional)res.getValue()).isPresent() && ((Collection)((Optional)res.getValue()).get()).stream().filter(s -> Objects.equals(s.group, group)).count() == 2L;
                    }, (String)"Expected a stable group with two members in describe group state result.");
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            List<String> describeType = DESCRIBE_TYPES.get(TestUtils.RANDOM.nextInt(DESCRIBE_TYPES.size()));
            String group = GROUP_PREFIX + groupProtocol.name() + String.join((CharSequence)"", describeType);
            ArrayList<String> cgcArgs = new ArrayList<String>(Arrays.asList("--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--timeout", "1", "--group", group));
            cgcArgs.addAll(describeType);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(cgcArgs.toArray(new String[0]));
                try {
                    ExecutionException e = (ExecutionException)Assertions.assertThrows(ExecutionException.class, () -> ((ConsumerGroupCommand.ConsumerGroupService)service).describeGroups());
                    Assertions.assertInstanceOf(TimeoutException.class, (Object)e.getCause());
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupOffsetsWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--timeout", "1"});
                try {
                    Throwable e = Assertions.assertThrows(ExecutionException.class, () -> service.collectGroupOffsets(group));
                    Assertions.assertEquals(TimeoutException.class, e.getCause().getClass());
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupMembersWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--timeout", "1"});
                try {
                    Throwable e = Assertions.assertThrows(ExecutionException.class, () -> service.collectGroupMembers(group));
                    Assertions.assertEquals(TimeoutException.class, e.getCause().getClass());
                    e = Assertions.assertThrows(ExecutionException.class, () -> service.collectGroupMembers(group));
                    Assertions.assertEquals(TimeoutException.class, e.getCause().getClass());
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeGroupStateWithShortInitializationTimeout(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.emptyMap());
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group, "--timeout", "1"});
                try {
                    Throwable e = Assertions.assertThrows(ExecutionException.class, () -> service.collectGroupState(group));
                    Assertions.assertEquals(TimeoutException.class, e.getCause().getClass());
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @ClusterTest
    public void testDescribeNonOffsetCommitGroup(ClusterInstance clusterInstance) throws Exception {
        this.clusterInstance = clusterInstance;
        for (GroupProtocol groupProtocol : clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, group, topic, Collections.singletonMap("enable.auto.commit", "false"));
            try {
                ConsumerGroupCommand.ConsumerGroupService service = DescribeConsumerGroupTest.consumerGroupService(new String[]{"--bootstrap-server", clusterInstance.bootstrapServers(), "--describe", "--group", group});
                try {
                    TestUtils.waitForCondition(() -> {
                        boolean res;
                        Map.Entry groupOffsets = service.collectGroupOffsets(group);
                        Predicate<PartitionAssignmentState> isGrp = s -> Objects.equals(s.group, group);
                        boolean bl = res = ((Optional)groupOffsets.getKey()).map(s -> s.equals((Object)GroupState.STABLE)).orElse(false) != false && ((Optional)groupOffsets.getValue()).isPresent() && ((Collection)((Optional)groupOffsets.getValue()).get()).stream().filter(isGrp).count() == 1L;
                        if (!res) {
                            return false;
                        }
                        Optional<PartitionAssignmentState> maybeAssignmentState = ((Collection)((Optional)groupOffsets.getValue()).get()).stream().filter(isGrp).findFirst();
                        if (maybeAssignmentState.isEmpty()) {
                            return false;
                        }
                        PartitionAssignmentState assignmentState = maybeAssignmentState.get();
                        return assignmentState.consumerId.map(c -> !c.trim().equals("-")).orElse(false) != false && assignmentState.clientId.map(c -> !c.trim().equals("-")).orElse(false) != false && assignmentState.host.map(h -> !h.trim().equals("-")).orElse(false) != false;
                    }, (String)("Expected a 'Stable' group status, rows and valid values for consumer id / client id / host columns in describe results for non-offset-committing group " + group + "."));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (protocolConsumerGroupExecutor == null) continue;
                protocolConsumerGroupExecutor.close();
            }
        }
    }

    @Test
    public void testDescribeWithUnrecognizedNewConsumerOption() {
        String group = "test.group.unrecognized";
        String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", "localhost:9092", "--describe", "--group", group};
        Assertions.assertThrows(OptionException.class, () -> ConsumerGroupCommandOptions.fromArgs((String[])cgcArgs));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDescribeWithMultipleSubActions() {
        String group = "test.group.multiple.sub.actions";
        AtomicInteger exitStatus = new AtomicInteger(0);
        AtomicReference<String> exitMessage = new AtomicReference<String>("");
        Exit.setExitProcedure((status, err) -> {
            exitStatus.set(status);
            exitMessage.set(err);
            throw new RuntimeException();
        });
        String[] cgcArgs = new String[]{"--bootstrap-server", "localhost:9092", "--describe", "--group", group, "--members", "--state"};
        try {
            Assertions.assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main((String[])cgcArgs));
        }
        finally {
            Exit.resetExitProcedure();
        }
        Assertions.assertEquals((int)1, (int)exitStatus.get());
        Assertions.assertTrue((boolean)exitMessage.get().contains("Option [describe] takes at most one of these options"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDescribeWithStateValue() {
        AtomicInteger exitStatus = new AtomicInteger(0);
        AtomicReference<String> exitMessage = new AtomicReference<String>("");
        Exit.setExitProcedure((status, err) -> {
            exitStatus.set(status);
            exitMessage.set(err);
            throw new RuntimeException();
        });
        String[] cgcArgs = new String[]{"--bootstrap-server", "localhost:9092", "--describe", "--all-groups", "--state", "Stable"};
        try {
            Assertions.assertThrows(RuntimeException.class, () -> ConsumerGroupCommand.main((String[])cgcArgs));
        }
        finally {
            Exit.resetExitProcedure();
        }
        Assertions.assertEquals((int)1, (int)exitStatus.get());
        Assertions.assertTrue((boolean)exitMessage.get().contains("Option [describe] does not take a value for [state]"));
    }

    @Test
    public void testPrintVersion() {
        ToolsTestUtils.MockExitProcedure exitProcedure = new ToolsTestUtils.MockExitProcedure();
        Exit.setExitProcedure((Exit.Procedure)exitProcedure);
        try {
            String out = ToolsTestUtils.captureStandardOut(() -> ConsumerGroupCommandOptions.fromArgs((String[])new String[]{"--version"}));
            Assertions.assertEquals((int)0, (int)exitProcedure.statusCode());
            Assertions.assertEquals((Object)AppInfoParser.getVersion(), (Object)out);
        }
        finally {
            Exit.resetExitProcedure();
        }
    }

    private static ConsumerGroupCommand.ConsumerGroupService consumerGroupService(String[] args) {
        return new ConsumerGroupCommand.ConsumerGroupService(ConsumerGroupCommandOptions.fromArgs((String[])args), Collections.singletonMap("retries", Integer.toString(Integer.MAX_VALUE)));
    }

    private void createTopic(String topic) {
        this.createTopic(topic, 1);
    }

    private void createTopic(String topic, int numPartitions) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> (Uuid)admin.createTopics(Collections.singletonList(new NewTopic(topic, numPartitions, 1))).topicId(topic).get());
        }
    }

    private void deleteConsumerGroups(Collection<String> groupIds) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> (Void)admin.deleteConsumerGroups(groupIds).all().get());
        }
    }

    private void deleteTopic(String topic) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> (Void)((KafkaFuture)admin.deleteTopics(Collections.singletonList(topic)).topicNameValues().get(topic)).get());
        }
    }

    private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, Set<TopicPartition> topicPartitions, Map<String, Object> customConfigs) {
        Map<String, Object> configs = this.composeConfigs(groupId, protocol.name, customConfigs);
        return ConsumerGroupCommandTestUtils.buildConsumers(1, topicPartitions, () -> new KafkaConsumer(configs));
    }

    private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, String topicName, Map<String, Object> customConfigs) {
        return this.consumerGroupClosable(protocol, groupId, topicName, customConfigs, 1);
    }

    private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, String topicName, Map<String, Object> customConfigs, int numConsumers) {
        return this.consumerGroupClosable(protocol, groupId, Set.of(topicName), customConfigs, numConsumers);
    }

    private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, Set<String> topicNames, Map<String, Object> customConfigs, int numConsumers) {
        Map<String, Object> configs = this.composeConfigs(groupId, protocol.name, customConfigs);
        return ConsumerGroupCommandTestUtils.buildConsumers(numConsumers, true, () -> new KafkaConsumer(configs), consumer -> consumer.subscribe((Collection)topicNames));
    }

    private Map<String, Object> composeConfigs(String groupId, String groupProtocol, Map<String, Object> customConfigs) {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", this.clusterInstance.bootstrapServers());
        configs.put("group.id", groupId);
        configs.put("key.deserializer", StringDeserializer.class.getName());
        configs.put("value.deserializer", StringDeserializer.class.getName());
        configs.put("group.protocol", groupProtocol);
        configs.putAll(customConfigs);
        return configs;
    }

    private Runnable describeGroups(ConsumerGroupCommand.ConsumerGroupService service) {
        return () -> Assertions.assertDoesNotThrow(() -> ((ConsumerGroupCommand.ConsumerGroupService)service).describeGroups());
    }

    private boolean checkArgsHeaderOutput(List<String> args, String output) {
        if (!output.contains("GROUP")) {
            return false;
        }
        if (args.contains("--members")) {
            return this.checkMembersArgsHeaderOutput(output, args.contains("--verbose"));
        }
        if (args.contains("--state")) {
            return this.checkStateArgsHeaderOutput(output, args.contains("--verbose"));
        }
        return this.checkOffsetsArgsHeaderOutput(output, args.contains("--verbose"));
    }

    private boolean checkOffsetsArgsHeaderOutput(String output, boolean verbose) {
        List<String> expectedKeys = verbose ? List.of("GROUP", "TOPIC", "PARTITION", "LEADER-EPOCH", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID") : List.of("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LOG-END-OFFSET", "LAG", "CONSUMER-ID", "HOST", "CLIENT-ID");
        return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
    }

    private boolean checkStateArgsOutput(String output) {
        return output.contains("COORDINATOR (ID)") && output.contains("ASSIGNMENT-STRATEGY") && output.contains("STATE") && output.contains("#MEMBERS");
    }

    private boolean checkMembersArgsHeaderOutput(String output, boolean verbose) {
        List<String> expectedKeys = verbose ? List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS", "CURRENT-EPOCH", "CURRENT-ASSIGNMENT", "TARGET-EPOCH", "TARGET-ASSIGNMENT") : List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "#PARTITIONS");
        return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
    }

    private boolean checkStateArgsHeaderOutput(String output, boolean verbose) {
        List<String> expectedKeys = verbose ? List.of("GROUP", "COORDINATOR", "(ID)", "ASSIGNMENT-STRATEGY", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS") : List.of("GROUP", "COORDINATOR", "(ID)", "ASSIGNMENT-STRATEGY", "STATE", "#MEMBERS");
        return Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
    }

    private void sendRecords(String topic, int partition, int recordsCount) {
        try (KafkaProducer producer = new KafkaProducer(Map.of("bootstrap.servers", this.clusterInstance.bootstrapServers(), "key.serializer", StringSerializer.class.getName(), "value.serializer", StringSerializer.class.getName()));){
            IntStream.range(0, recordsCount).forEach(i -> producer.send(new ProducerRecord(topic, Integer.valueOf(partition), (Object)Integer.toString(i), (Object)Integer.toString(i))));
            producer.flush();
        }
    }
}

