/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.streams;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.streams.StreamsGroupCommand;
import org.apache.kafka.tools.streams.StreamsGroupCommandOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class DescribeStreamsGroupTest {
    public static EmbeddedKafkaCluster cluster = null;
    static KafkaStreams streams;
    private static final String APP_ID = "streams-group-command-test";
    private static final String INPUT_TOPIC = "customInputTopic";
    private static final String OUTPUT_TOPIC = "customOutputTopic";

    @BeforeAll
    public static void setup() throws Exception {
        Properties props = new Properties();
        props.setProperty("group.coordinator.rebalance.protocols", "classic,consumer,streams");
        cluster = new EmbeddedKafkaCluster(1, props);
        cluster.start();
        cluster.createTopic(INPUT_TOPIC, 2, 1);
        Properties streamsProp = new Properties();
        streamsProp.put("auto.offset.reset", "earliest");
        streamsProp.put("bootstrap.servers", cluster.bootstrapServers());
        streamsProp.put("default.key.serde", Serdes.String().getClass().getName());
        streamsProp.put("default.value.serde", Serdes.String().getClass().getName());
        streamsProp.put("state.dir", TestUtils.tempDirectory().getPath());
        streamsProp.put("application.id", APP_ID);
        streamsProp.put("num.stream.threads", (Object)2);
        streamsProp.put("group.protocol", GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
        streams = new KafkaStreams(DescribeStreamsGroupTest.topology(), streamsProp);
        IntegrationTestUtils.startApplicationAndWaitUntilRunning((KafkaStreams)streams);
    }

    @AfterAll
    public static void closeCluster() {
        streams.close();
        cluster.stop();
        cluster = null;
    }

    @Test
    public void testDescribeStreamsGroup() throws Exception {
        List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "OFFSET-LAG");
        Set<List<String>> expectedRows = Set.of(List.of(APP_ID, INPUT_TOPIC, "0", "0"), List.of(APP_ID, INPUT_TOPIC, "1", "0"), List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "0"), List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "0"));
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe"), expectedHeader, expectedRows, List.of());
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets"), expectedHeader, expectedRows, List.of());
    }

    @Test
    public void testDescribeStreamsGroupWithVerboseOption() throws Exception {
        List<String> expectedHeader = List.of("GROUP", "TOPIC", "PARTITION", "CURRENT-OFFSET", "LEADER-EPOCH", "LOG-END-OFFSET", "OFFSET-LAG");
        Set<List<String>> expectedRows = Set.of(List.of(APP_ID, INPUT_TOPIC, "0", "-", "-", "0", "0"), List.of(APP_ID, INPUT_TOPIC, "1", "-", "-", "0", "0"), List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "0", "-", "-", "0", "0"), List.of(APP_ID, "streams-group-command-test-KSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition", "1", "-", "-", "0", "0"));
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose"), expectedHeader, expectedRows, List.of());
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--offsets", "--verbose"), expectedHeader, expectedRows, List.of());
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--offsets"), expectedHeader, expectedRows, List.of());
    }

    @Test
    public void testDescribeStreamsGroupWithStateOption() throws Exception {
        List<String> expectedHeader = Arrays.asList("GROUP", "COORDINATOR", "(ID)", "STATE", "#MEMBERS");
        Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, "", "", "Stable", "2"));
        List<Integer> dontCares = List.of(Integer.valueOf(1), Integer.valueOf(2));
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state"), expectedHeader, expectedRows, dontCares);
    }

    @Test
    public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws Exception {
        List<String> expectedHeader = Arrays.asList("GROUP", "COORDINATOR", "(ID)", "STATE", "GROUP-EPOCH", "TARGET-ASSIGNMENT-EPOCH", "#MEMBERS");
        Set<List<String>> expectedRows = Set.of(Arrays.asList(APP_ID, "", "", "Stable", "3", "3", "2"));
        List<Integer> dontCares = List.of(Integer.valueOf(1), Integer.valueOf(2));
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--state", "--verbose"), expectedHeader, expectedRows, dontCares);
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--state"), expectedHeader, expectedRows, dontCares);
    }

    @Test
    public void testDescribeStreamsGroupWithMembersOption() throws Exception {
        List<String> expectedHeader = List.of("GROUP", "MEMBER", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
        Set<List<String>> expectedRows = Set.of(List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0,1];"), List.of(APP_ID, "", "", "", "ACTIVE:", "1:[0,1];"));
        List<Integer> dontCares = List.of(Integer.valueOf(1), Integer.valueOf(2), Integer.valueOf(3));
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members"), expectedHeader, expectedRows, dontCares);
    }

    @Test
    public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Exception {
        List<String> expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS");
        Set<List<String>> expectedRows = Set.of(List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"), List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];"));
        List<Integer> dontCares = List.of(Integer.valueOf(3), Integer.valueOf(6), Integer.valueOf(7));
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--members", "--verbose"), expectedHeader, expectedRows, dontCares);
        DescribeStreamsGroupTest.validateDescribeOutput(Arrays.asList("--bootstrap-server", cluster.bootstrapServers(), "--describe", "--verbose", "--members"), expectedHeader, expectedRows, dontCares);
    }

    private static Topology topology() {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(INPUT_TOPIC, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))).groupBy((key, value) -> value).count().toStream().to(OUTPUT_TOPIC, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        return builder.build();
    }

    private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
        StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs((String[])args);
        return new StreamsGroupCommand.StreamsGroupService(opts, Map.of("retries", Integer.toString(Integer.MAX_VALUE)));
    }

    private static void validateDescribeOutput(List<String> args, List<String> expectedHeader, Set<List<String>> expectedRows, List<Integer> dontCareIndices) throws InterruptedException {
        AtomicReference<String> out = new AtomicReference<String>("");
        TestUtils.waitForCondition(() -> {
            String output = ToolsTestUtils.grabConsoleOutput(() -> StreamsGroupCommand.main((String[])args.toArray(new String[0])));
            out.set(output);
            String[] lines = output.split("\n");
            if (lines.length == 1 && lines[0].isEmpty()) {
                lines = new String[]{};
            }
            if (lines.length == 0) {
                return false;
            }
            List<String> header = Arrays.asList(lines[0].split("\\s+"));
            if (!expectedHeader.equals(header)) {
                return false;
            }
            Set groupDesc = Arrays.stream(Arrays.copyOfRange(lines, 1, lines.length)).map(line -> Arrays.asList(line.split("\\s+"))).collect(Collectors.toSet());
            if (groupDesc.size() != expectedRows.size()) {
                return false;
            }
            return expectedRows.equals(groupDesc.stream().map(list -> {
                ArrayList listCloned = new ArrayList(list);
                dontCareIndices.forEach(index -> listCloned.set((int)index, ""));
                return listCloned;
            }).collect(Collectors.toSet()));
        }, () -> String.format("Expected header=%s and groups=%s, but found:%n%s", expectedHeader, expectedRows, out.get()));
    }
}

