/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.streams;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import joptsimple.OptionException;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.GroupType;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.streams.StreamsGroupCommand;
import org.apache.kafka.tools.streams.StreamsGroupCommandOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class ListStreamsGroupTest {
    public static EmbeddedKafkaCluster cluster = null;
    static KafkaStreams streams;
    private static final String APP_ID = "streams-group-command-test";
    private static final String INPUT_TOPIC = "customInputTopic";
    private static final String OUTPUT_TOPIC = "customOutputTopic";

    @BeforeAll
    public static void setup() throws Exception {
        Properties props = new Properties();
        props.setProperty("group.coordinator.rebalance.protocols", "classic,consumer,streams");
        cluster = new EmbeddedKafkaCluster(1, props);
        cluster.start();
        cluster.createTopic(INPUT_TOPIC, 2, 1);
        Properties streamsProp = new Properties();
        streamsProp.put("auto.offset.reset", "earliest");
        streamsProp.put("bootstrap.servers", cluster.bootstrapServers());
        streamsProp.put("default.key.serde", Serdes.String().getClass().getName());
        streamsProp.put("default.value.serde", Serdes.String().getClass().getName());
        streamsProp.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsProp.put("application.id", APP_ID);
        streamsProp.put("num.stream.threads", (Object)2);
        streamsProp.put("group.protocol", GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
        streams = new KafkaStreams(ListStreamsGroupTest.topology(), streamsProp);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
    }

    @AfterAll
    public static void closeCluster() {
        streams.close();
        cluster.stop();
        cluster = null;
    }

    @Test
    public void testListStreamsGroupWithoutFilters() throws Exception {
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list"});){
            HashSet<String> expectedGroups = new HashSet<String>(Collections.singleton(APP_ID));
            AtomicReference foundGroups = new AtomicReference();
            TestUtils.waitForCondition(() -> {
                foundGroups.set(new HashSet(service.listStreamsGroups()));
                return Objects.equals(expectedGroups, foundGroups.get());
            }, (String)("Expected --list to show streams groups " + String.valueOf(expectedGroups) + ", but found " + String.valueOf(foundGroups.get()) + "."));
        }
    }

    @Test
    public void testListWithUnrecognizedNewOption() {
        String[] cgcArgs = new String[]{"--new-option", "--bootstrap-server", cluster.bootstrapServers(), "--list"};
        Assertions.assertThrows(OptionException.class, () -> this.getStreamsGroupService(cgcArgs));
    }

    @Test
    public void testListStreamsGroupWithStates() throws Exception {
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"});){
            Set<GroupListing> expectedListing = Set.of(new GroupListing(APP_ID, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)));
            AtomicReference foundListing = new AtomicReference();
            TestUtils.waitForCondition(() -> {
                foundListing.set(new HashSet(service.listStreamsGroupsInStates(Collections.emptySet())));
                return Objects.equals(expectedListing, foundListing.get());
            }, (String)("Expected --list to show streams groups " + String.valueOf(expectedListing) + ", but found " + String.valueOf(foundListing.get()) + "."));
        }
    }

    @Test
    public void testListStreamsGroupWithSpecifiedStates() throws Exception {
        AtomicReference foundListing;
        Set<Object> expectedListing;
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"});){
            expectedListing = Set.of(new GroupListing(APP_ID, Optional.of(GroupType.STREAMS), "streams", Optional.of(GroupState.STABLE)));
            foundListing = new AtomicReference();
            TestUtils.waitForCondition(() -> {
                foundListing.set(new HashSet(service.listStreamsGroupsInStates(Collections.emptySet())));
                return Objects.equals(expectedListing, foundListing.get());
            }, (String)("Expected --list to show streams groups " + String.valueOf(expectedListing) + ", but found " + String.valueOf(foundListing.get()) + "."));
        }
        service = this.getStreamsGroupService(new String[]{"--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "PreparingRebalance"});
        try {
            expectedListing = Collections.emptySet();
            foundListing = new AtomicReference();
            TestUtils.waitForCondition(() -> {
                foundListing.set(new HashSet(service.listStreamsGroupsInStates(Collections.singleton(GroupState.PREPARING_REBALANCE))));
                return Objects.equals(expectedListing, foundListing.get());
            }, (String)("Expected --list to show streams groups " + String.valueOf(expectedListing) + ", but found " + String.valueOf(foundListing.get()) + "."));
        }
        finally {
            if (service != null) {
                service.close();
            }
        }
    }

    @Test
    public void testListStreamsGroupOutput() throws Exception {
        ListStreamsGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list"), Collections.emptyList(), Set.of(Collections.singletonList(APP_ID)));
        ListStreamsGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state"), Arrays.asList("GROUP", "STATE"), Set.of(Arrays.asList(APP_ID, "Stable")));
        ListStreamsGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "Stable"), Arrays.asList("GROUP", "STATE"), Set.of(Arrays.asList(APP_ID, "Stable")));
        ListStreamsGroupTest.validateListOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--list", "--state", "stable"), Arrays.asList("GROUP", "STATE"), Set.of(Arrays.asList(APP_ID, "Stable")));
    }

    private static Topology topology() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))).groupBy((key, value) -> value).count().toStream().to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        return builder.build();
    }

    private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
        StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs((String[])args);
        return new StreamsGroupCommand.StreamsGroupService(opts, Collections.singletonMap("retries", Integer.toString(Integer.MAX_VALUE)));
    }

    private static void validateListOutput(List<String> args, List<String> expectedHeader, Set<List<String>> expectedRows) throws InterruptedException {
        AtomicReference<String> out = new AtomicReference<String>("");
        TestUtils.waitForCondition(() -> {
            List<String> header;
            String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main((String[])args.toArray(new String[0])));
            out.set(output);
            String[] lines = output.split("\n");
            if (lines.length == 1 && lines[0].isEmpty()) {
                lines = new String[]{};
            }
            if (!expectedHeader.isEmpty() && lines.length > 0 && !expectedHeader.equals(header = Arrays.asList(lines[0].split("\\s+")))) {
                return false;
            }
            Set groups = Arrays.stream(lines, expectedHeader.isEmpty() ? 0 : 1, lines.length).map(line -> Arrays.asList(line.split("\\s+"))).collect(Collectors.toSet());
            return expectedRows.equals(groups);
        }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
    }
}

