/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.consumer.group;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import joptsimple.OptionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommand;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandOptions;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTestUtils;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(types={Type.CO_KRAFT}, serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="group.initial.rebalance.delay.ms", value="1000"), @ClusterConfigProperty(key="group.consumer.heartbeat.interval.ms", value="500"), @ClusterConfigProperty(key="group.consumer.min.heartbeat.interval.ms", value="500")})
public class ListConsumerGroupTest {
    private static final String TOPIC_PREFIX = "test.topic.";
    private static final String TOPIC_PARTITIONS_GROUP_PREFIX = "test.topic.partitions.group.";
    private static final String TOPIC_GROUP_PREFIX = "test.topic.group.";
    private static final String PROTOCOL_GROUP_PREFIX = "test.protocol.group.";
    private final ClusterInstance clusterInstance;

    ListConsumerGroupTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    private List<GroupProtocol> supportedGroupProtocols() {
        return new ArrayList<GroupProtocol>(this.clusterInstance.supportedGroupProtocols());
    }

    @ClusterTest
    public void testListConsumerGroupsWithoutFilters() throws Exception {
        for (int i = 0; i < this.supportedGroupProtocols().size(); ++i) {
            GroupProtocol groupProtocol = this.supportedGroupProtocols().get(i);
            String topic = TOPIC_PREFIX + groupProtocol.name;
            String protocolGroup = PROTOCOL_GROUP_PREFIX + groupProtocol.name;
            String topicGroup = TOPIC_GROUP_PREFIX + i;
            String topicPartitionsGroup = TOPIC_PARTITIONS_GROUP_PREFIX + i;
            this.createTopic(topic);
            try (AutoCloseable topicPartitionsConsumerGroupExecutor = this.consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
                 AutoCloseable topicConsumerGroupExecutor = this.consumerGroupClosable(GroupProtocol.CLASSIC, topicGroup, topic);
                 AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, protocolGroup, topic);
                 ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(new String[]{"--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list"});){
                Set<String> expectedGroups = ListConsumerGroupTest.set(Arrays.asList(topicPartitionsGroup, topicGroup, protocolGroup));
                AtomicReference foundGroups = new AtomicReference();
                TestUtils.waitForCondition(() -> {
                    foundGroups.set(ListConsumerGroupTest.set(service.listConsumerGroups()));
                    return Objects.equals(expectedGroups, foundGroups.get());
                }, (String)("Expected --list to show groups " + String.valueOf(expectedGroups) + ", but found " + String.valueOf(foundGroups.get()) + "."));
            }
            this.removeConsumer(ListConsumerGroupTest.set(Arrays.asList(topicPartitionsGroup, topicGroup, protocolGroup)));
            this.deleteTopic(topic);
        }
    }

    @ClusterTest
    public void testListWithUnrecognizedNewConsumerOption() {
        String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list"};
        Assertions.assertThrows(OptionException.class, () -> this.getConsumerGroupService(cgcArgs));
    }

    @ClusterTest
    public void testListConsumerGroupsWithStates() throws Exception {
        for (int i = 0; i < this.supportedGroupProtocols().size(); ++i) {
            GroupProtocol groupProtocol = this.supportedGroupProtocols().get(i);
            String topic = TOPIC_PREFIX + groupProtocol.name;
            String protocolGroup = PROTOCOL_GROUP_PREFIX + groupProtocol.name;
            String topicPartitionsGroup = TOPIC_PARTITIONS_GROUP_PREFIX + i;
            this.createTopic(topic);
            try (AutoCloseable topicPartitionsConsumerGroupExecutor = this.consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
                 AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, protocolGroup, topic);
                 ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(new String[]{"--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--state"});){
                Set<GroupListing> expectedListing = Set.of(new GroupListing(topicPartitionsGroup, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)), new GroupListing(protocolGroup, Optional.of(GroupType.parse((String)groupProtocol.name())), "consumer", Optional.of(GroupState.STABLE)));
                ListConsumerGroupTest.assertGroupListing(service, Collections.emptySet(), EnumSet.allOf(GroupState.class), expectedListing);
                expectedListing = Set.of(new GroupListing(protocolGroup, Optional.of(GroupType.parse((String)groupProtocol.name())), "consumer", Optional.of(GroupState.STABLE)));
                ListConsumerGroupTest.assertGroupListing(service, Collections.emptySet(), Set.of(GroupState.STABLE), expectedListing);
                ListConsumerGroupTest.assertGroupListing(service, Collections.emptySet(), Set.of(GroupState.PREPARING_REBALANCE), Collections.emptySet());
            }
            this.removeConsumer(ListConsumerGroupTest.set(Arrays.asList(topicPartitionsGroup, protocolGroup)));
            this.deleteTopic(topic);
        }
    }

    @ClusterTest
    public void testListConsumerGroupsWithTypesClassicProtocol() throws Exception {
        GroupProtocol groupProtocol = GroupProtocol.CLASSIC;
        String topic = TOPIC_PREFIX + groupProtocol.name;
        String protocolGroup = PROTOCOL_GROUP_PREFIX + groupProtocol.name;
        String topicPartitionsGroup = "test.topic.partitions.group.0";
        this.createTopic(topic);
        try (AutoCloseable topicPartitionsConsumerGroupExecutor = this.consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
             AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, protocolGroup, topic);
             ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(new String[]{"--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--state"});){
            Set<GroupListing> expectedListing = Set.of(new GroupListing(topicPartitionsGroup, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)), new GroupListing(protocolGroup, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)));
            ListConsumerGroupTest.assertGroupListing(service, Collections.emptySet(), Collections.emptySet(), expectedListing);
            ListConsumerGroupTest.assertGroupListing(service, Set.of(GroupType.CONSUMER), Collections.emptySet(), Collections.emptySet());
            ListConsumerGroupTest.assertGroupListing(service, Set.of(GroupType.CLASSIC), Collections.emptySet(), expectedListing);
        }
    }

    @ClusterTest
    public void testListConsumerGroupsWithTypesConsumerProtocol() throws Exception {
        GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
        String topic = TOPIC_PREFIX + groupProtocol.name;
        String protocolGroup = PROTOCOL_GROUP_PREFIX + groupProtocol.name;
        String topicGroup = "test.topic.group.0";
        String topicPartitionsGroup = "test.topic.partitions.group.0";
        this.createTopic(topic);
        try (AutoCloseable topicPartitionsConsumerGroupExecutor = this.consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
             AutoCloseable topicConsumerGroupExecutor = this.consumerGroupClosable(GroupProtocol.CLASSIC, topicGroup, topic);
             AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, protocolGroup, topic);
             ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(new String[]{"--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list"});){
            Set<GroupListing> expectedListing = Set.of(new GroupListing(topicPartitionsGroup, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)), new GroupListing(topicGroup, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)), new GroupListing(protocolGroup, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)));
            ListConsumerGroupTest.assertGroupListing(service, Collections.emptySet(), Collections.emptySet(), expectedListing);
            expectedListing = Set.of(new GroupListing(protocolGroup, Optional.of(GroupType.CONSUMER), "consumer", Optional.of(GroupState.STABLE)));
            ListConsumerGroupTest.assertGroupListing(service, Set.of(GroupType.CONSUMER), Collections.emptySet(), expectedListing);
            expectedListing = Set.of(new GroupListing(topicPartitionsGroup, Optional.of(GroupType.CLASSIC), "", Optional.of(GroupState.EMPTY)), new GroupListing(topicGroup, Optional.of(GroupType.CLASSIC), "consumer", Optional.of(GroupState.STABLE)));
            ListConsumerGroupTest.assertGroupListing(service, Set.of(GroupType.CLASSIC), Collections.emptySet(), expectedListing);
        }
    }

    @ClusterTest
    public void testListGroupCommandClassicProtocol() throws Exception {
        GroupProtocol groupProtocol = GroupProtocol.CLASSIC;
        String topic = TOPIC_PREFIX + groupProtocol.name;
        String protocolGroup = PROTOCOL_GROUP_PREFIX + groupProtocol.name;
        String topicPartitionsGroup = "test.topic.partitions.group.0";
        this.createTopic(topic);
        try (AutoCloseable topicPartitionsConsumerGroupExecutor = this.consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
             AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, protocolGroup, topic);){
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list"), Collections.emptyList(), Set.of(Collections.singletonList(protocolGroup), Collections.singletonList(topicPartitionsGroup)));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--state"), Arrays.asList("GROUP", "STATE"), Set.of(Arrays.asList(protocolGroup, "Stable"), Arrays.asList(topicPartitionsGroup, "Empty")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type"), Arrays.asList("GROUP", "TYPE"), Set.of(Arrays.asList(protocolGroup, "Classic"), Arrays.asList(topicPartitionsGroup, "Classic")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type", "--state"), Arrays.asList("GROUP", "TYPE", "STATE"), Set.of(Arrays.asList(protocolGroup, "Classic", "Stable"), Arrays.asList(topicPartitionsGroup, "Classic", "Empty")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--state", "Stable"), Arrays.asList("GROUP", "STATE"), Set.of(Arrays.asList(protocolGroup, "Stable")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--state", "stable"), Arrays.asList("GROUP", "STATE"), Set.of(Arrays.asList(protocolGroup, "Stable")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type", "Classic"), Arrays.asList("GROUP", "TYPE"), Set.of(Arrays.asList(protocolGroup, "Classic"), Arrays.asList(topicPartitionsGroup, "Classic")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type", "classic"), Arrays.asList("GROUP", "TYPE"), Set.of(Arrays.asList(protocolGroup, "Classic"), Arrays.asList(topicPartitionsGroup, "Classic")));
        }
    }

    @ClusterTest
    public void testListGroupCommandConsumerProtocol() throws Exception {
        GroupProtocol groupProtocol = GroupProtocol.CONSUMER;
        String topic = TOPIC_PREFIX + groupProtocol.name;
        String protocolGroup = PROTOCOL_GROUP_PREFIX + groupProtocol.name;
        String topicPartitionsGroup = "test.topic.partitions.group.0";
        this.createTopic(topic);
        try (AutoCloseable topicPartitionsConsumerGroupExecutor = this.consumerGroupClosable(topicPartitionsGroup, Collections.singleton(new TopicPartition(topic, 0)));
             AutoCloseable protocolConsumerGroupExecutor = this.consumerGroupClosable(groupProtocol, protocolGroup, topic);){
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list"), Collections.emptyList(), Set.of(Collections.singletonList(protocolGroup), Collections.singletonList(topicPartitionsGroup)));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--state"), Arrays.asList("GROUP", "STATE"), Set.of(Arrays.asList(protocolGroup, "Stable"), Arrays.asList(topicPartitionsGroup, "Empty")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type"), Arrays.asList("GROUP", "TYPE"), Set.of(Arrays.asList(protocolGroup, "Consumer"), Arrays.asList(topicPartitionsGroup, "Classic")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type", "--state"), Arrays.asList("GROUP", "TYPE", "STATE"), Set.of(Arrays.asList(protocolGroup, "Consumer", "Stable"), Arrays.asList(topicPartitionsGroup, "Classic", "Empty")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type", "consumer"), Arrays.asList("GROUP", "TYPE"), Set.of(Arrays.asList(protocolGroup, "Consumer")));
            ListConsumerGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", this.clusterInstance.bootstrapServers(), "--list", "--type", "consumer", "--state", "Stable"), Arrays.asList("GROUP", "TYPE", "STATE"), Set.of(Arrays.asList(protocolGroup, "Consumer", "Stable")));
        }
    }

    private AutoCloseable consumerGroupClosable(GroupProtocol protocol, String groupId, String topicName) {
        Map<String, Object> configs = this.composeConfigs(groupId, protocol.name, Collections.emptyMap());
        return ConsumerGroupCommandTestUtils.buildConsumers(1, false, topicName, () -> new KafkaConsumer(configs));
    }

    private AutoCloseable consumerGroupClosable(String groupId, Set<TopicPartition> topicPartitions) {
        Map<String, Object> configs = this.composeConfigs(groupId, GroupProtocol.CLASSIC.name, Collections.emptyMap());
        return ConsumerGroupCommandTestUtils.buildConsumers(1, topicPartitions, () -> new KafkaConsumer(configs));
    }

    private Map<String, Object> composeConfigs(String groupId, String groupProtocol, Map<String, Object> customConfigs) {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", this.clusterInstance.bootstrapServers());
        configs.put("group.id", groupId);
        configs.put("key.deserializer", StringDeserializer.class.getName());
        configs.put("value.deserializer", StringDeserializer.class.getName());
        configs.put("group.protocol", groupProtocol);
        if (GroupProtocol.CLASSIC.name.equalsIgnoreCase(groupProtocol)) {
            configs.put("partition.assignment.strategy", RangeAssignor.class.getName());
        }
        configs.putAll(customConfigs);
        return configs;
    }

    private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) {
        ConsumerGroupCommandOptions opts = ConsumerGroupCommandOptions.fromArgs((String[])args);
        ConsumerGroupCommand.ConsumerGroupService service = new ConsumerGroupCommand.ConsumerGroupService(opts, Collections.singletonMap("retries", Integer.toString(Integer.MAX_VALUE)));
        return service;
    }

    private void createTopic(String topic) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> (Uuid)admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, 1))).topicId(topic).get());
        }
    }

    private void deleteTopic(String topic) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> (Void)admin.deleteTopics(Collections.singleton(topic)).all().get());
        }
    }

    private void removeConsumer(Set<String> groupIds) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> (Void)admin.deleteConsumerGroups((Collection)groupIds).all().get());
        }
    }

    private static void assertGroupListing(ConsumerGroupCommand.ConsumerGroupService service, Set<GroupType> typeFilterSet, Set<GroupState> groupStateFilterSet, Set<GroupListing> expectedListing) throws Exception {
        AtomicReference foundListing = new AtomicReference();
        TestUtils.waitForCondition(() -> {
            foundListing.set(ListConsumerGroupTest.set(service.listConsumerGroupsWithFilters(ListConsumerGroupTest.set(typeFilterSet), ListConsumerGroupTest.set(groupStateFilterSet))));
            return Objects.equals(ListConsumerGroupTest.set(expectedListing), foundListing.get());
        }, () -> "Expected to show groups " + String.valueOf(expectedListing) + ", but found " + String.valueOf(foundListing.get()) + ".");
    }

    private static void validateListOutput(List<String> args, List<String> expectedHeader, Set<List<String>> expectedRows) throws InterruptedException {
        AtomicReference<String> out = new AtomicReference<String>("");
        TestUtils.waitForCondition(() -> {
            String output = ToolsTestUtils.grabConsoleOutput(() -> ConsumerGroupCommand.main((String[])args.toArray(new String[0])));
            out.set(output);
            int index = 0;
            String[] lines = output.split("\n");
            if (!expectedHeader.isEmpty()) {
                List header;
                if (lines.length == 0) {
                    return false;
                }
                if (!expectedHeader.equals(header = Arrays.stream(lines[index++].split("\\s+")).collect(Collectors.toList()))) {
                    return false;
                }
            }
            HashSet groups = new HashSet();
            while (index < lines.length) {
                groups.add(Arrays.stream(lines[index].split("\\s+")).collect(Collectors.toList()));
                ++index;
            }
            return expectedRows.equals(groups);
        }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
    }

    public static <T> Set<T> set(Collection<T> set) {
        return new HashSet<T>(set);
    }
}

