/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.streams;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import joptsimple.OptionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureUpdate;
import org.apache.kafka.clients.admin.UpdateFeaturesOptions;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.errors.GroupNotEmptyException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.ToolsTestUtils;
import org.apache.kafka.tools.streams.StreamsGroupCommand;
import org.apache.kafka.tools.streams.StreamsGroupCommandOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class DeleteStreamsGroupTest {
    private static final String INPUT_TOPIC_PREFIX = "input-topic-";
    private static final String OUTPUT_TOPIC_PREFIX = "output-topic-";
    private static final String APP_ID_PREFIX = "delete-group-test-";
    private static final int RECORD_TOTAL = 10;
    public static EmbeddedKafkaCluster cluster;
    private static String bootstrapServers;

    @BeforeAll
    public static void startCluster() {
        Properties props = new Properties();
        cluster = new EmbeddedKafkaCluster(2, props);
        cluster.start();
        bootstrapServers = cluster.bootstrapServers();
    }

    @AfterAll
    public static void closeCluster() {
        cluster.stop();
    }

    @Test
    public void testDeleteWithUnrecognizedOption() {
        String[] args = new String[]{"--unrecognized-option", "--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};
        Assertions.assertThrows(OptionException.class, () -> this.getStreamsGroupService(args));
    }

    @Test
    public void testDeleteWithoutGroupOption() {
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete"};
        AtomicBoolean exited = new AtomicBoolean(false);
        Exit.setExitProcedure((statusCode, message) -> {
            Assertions.assertNotEquals((int)0, (int)statusCode);
            Assertions.assertTrue((boolean)message.contains("Option [delete] takes one of these options: [all-groups], [group]"));
            exited.set(true);
        });
        try {
            this.getStreamsGroupService(args);
        }
        finally {
            Assertions.assertTrue((boolean)exited.get());
        }
    }

    @Test
    public void testDeleteSingleGroup() throws Exception {
        String appId = this.generateGroupAppId();
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--group", appId};
        StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
        try (KafkaStreams streams = this.startKSApp(appId, service);){
            String output = ToolsTestUtils.grabConsoleOutput(() -> ((StreamsGroupCommand.StreamsGroupService)service).deleteGroups());
            Map result = service.deleteGroups();
            Assertions.assertTrue((output.contains("Group '" + appId + "' could not be deleted due to:") && output.contains("Streams group '" + appId + "' is not EMPTY.") ? 1 : 0) != 0, (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Output was: (" + output + ")"));
            Assertions.assertNotNull(result.get(appId), (String)("Group was deleted successfully, but it shouldn't have been. Result was:(" + String.valueOf(result) + ")"));
            Assertions.assertEquals((int)1, (int)result.size());
            Assertions.assertInstanceOf(GroupNotEmptyException.class, result.get(appId), (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Result was:(" + String.valueOf(result) + ")"));
            this.stopKSApp(appId, streams, service);
            HashMap emptyGrpRes = new HashMap();
            output = ToolsTestUtils.grabConsoleOutput(() -> emptyGrpRes.putAll(service.deleteGroups()));
            Assertions.assertTrue((boolean)output.contains("Deletion of requested streams groups ('" + appId + "') was successful."), (String)"The streams group could not be deleted as expected");
            Assertions.assertTrue((boolean)output.contains("Deletion of associated internal topics of the streams groups ('" + appId + "') was successful."), (String)"The internal topics could not be deleted as expected.");
            Assertions.assertEquals((int)1, (int)emptyGrpRes.size());
            Assertions.assertTrue((boolean)emptyGrpRes.containsKey(appId));
            Assertions.assertNull(emptyGrpRes.get(appId), (String)"The streams group could not be deleted as expected");
            Assertions.assertTrue((boolean)service.retrieveInternalTopics(List.of(appId)).isEmpty());
            result = service.deleteGroups();
            Assertions.assertEquals((int)1, (int)result.size());
            Assertions.assertNotNull(result.get(appId));
            Assertions.assertInstanceOf(IllegalArgumentException.class, result.get(appId), (String)"The expected error was not detected while deleting streams group");
        }
    }

    @Test
    public void testDeleteMultipleGroup() throws Exception {
        String appId1 = this.generateGroupAppId();
        String appId2 = this.generateGroupAppId();
        String appId3 = this.generateGroupAppId();
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};
        StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
        KafkaStreams streams1 = this.startKSApp(appId1, service);
        KafkaStreams streams2 = this.startKSApp(appId2, service);
        KafkaStreams streams3 = this.startKSApp(appId3, service);
        HashMap result = new HashMap();
        String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
        Assertions.assertTrue((output.contains("Group '" + appId1 + "' could not be deleted due to:") && output.contains("Streams group '" + appId1 + "' is not EMPTY.") ? 1 : 0) != 0, (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Output was: (" + output + ")"));
        Assertions.assertTrue((output.contains("Group '" + appId3 + "' could not be deleted due to:") && output.contains("Streams group '" + appId3 + "' is not EMPTY.") ? 1 : 0) != 0, (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Output was: (" + output + ")"));
        Assertions.assertTrue((output.contains("Group '" + appId2 + "' could not be deleted due to:") && output.contains("Streams group '" + appId2 + "' is not EMPTY.") ? 1 : 0) != 0, (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Output was: (" + output + ")"));
        Assertions.assertNotNull(result.get(appId1), (String)("Group was deleted successfully, but it shouldn't have been. Result was:(" + String.valueOf(result) + ")"));
        Assertions.assertNotNull(result.get(appId2), (String)("Group was deleted successfully, but it shouldn't have been. Result was:(" + String.valueOf(result) + ")"));
        Assertions.assertNotNull(result.get(appId3), (String)("Group was deleted successfully, but it shouldn't have been. Result was:(" + String.valueOf(result) + ")"));
        Assertions.assertEquals((int)3, (int)result.size());
        Assertions.assertInstanceOf(GroupNotEmptyException.class, result.get(appId1), (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Result was:(" + String.valueOf(result) + ")"));
        Assertions.assertInstanceOf(GroupNotEmptyException.class, result.get(appId2), (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Result was:(" + String.valueOf(result) + ")"));
        Assertions.assertInstanceOf(GroupNotEmptyException.class, result.get(appId3), (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Result was:(" + String.valueOf(result) + ")"));
        this.stopKSApp(appId1, streams1, service);
        HashMap mixGrpsRes = new HashMap();
        output = ToolsTestUtils.grabConsoleOutput(() -> mixGrpsRes.putAll(service.deleteGroups()));
        Assertions.assertTrue((boolean)output.contains("Deletion of some streams groups failed:"), (String)"The streams groups deletion did not work as expected");
        Assertions.assertTrue((output.contains("Group '" + appId2 + "' could not be deleted due to:") && output.contains("Streams group '" + appId2 + "' is not EMPTY.") ? 1 : 0) != 0, (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Result was:(" + String.valueOf(result) + ")"));
        Assertions.assertTrue((output.contains("Group '" + appId3 + "' could not be deleted due to:") && output.contains("Streams group '" + appId3 + "' is not EMPTY.") ? 1 : 0) != 0, (String)("The expected error (" + String.valueOf(Errors.NON_EMPTY_GROUP) + ") was not detected while deleting streams group. Result was:(" + String.valueOf(result) + ")"));
        Assertions.assertTrue((boolean)output.contains("These streams groups were deleted successfully: '" + appId1 + "'"), (String)"The streams groups deletion did not work as expected");
        Assertions.assertTrue((boolean)output.contains("Deletion of associated internal topics of the streams groups ('" + appId1 + "') was successful."), (String)"The internal topics could not be deleted as expected");
        Assertions.assertEquals((int)3, (int)mixGrpsRes.size());
        Assertions.assertNull(mixGrpsRes.get(appId1));
        Assertions.assertNotNull(mixGrpsRes.get(appId2));
        Assertions.assertNotNull(mixGrpsRes.get(appId3));
        Assertions.assertTrue((boolean)service.retrieveInternalTopics(List.of(appId1)).isEmpty());
        Assertions.assertFalse((boolean)service.retrieveInternalTopics(List.of(appId2, appId3)).isEmpty());
        this.stopKSApp(appId2, streams2, service);
        this.stopKSApp(appId3, streams3, service);
        HashMap allGrpsRes = new HashMap();
        output = ToolsTestUtils.grabConsoleOutput(() -> allGrpsRes.putAll(service.deleteGroups()));
        Assertions.assertTrue((boolean)(output.contains("Deletion of requested streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | output.contains("Deletion of requested streams groups ('" + appId3 + "', '" + appId2 + "') was successful.")), (String)"The streams groups deletion did not work as expected");
        Assertions.assertTrue((boolean)(output.contains("Deletion of associated internal topics of the streams groups ('" + appId2 + "', '" + appId3 + "') was successful.") | output.contains("Deletion of associated internal topics of the streams groups ('" + appId3 + "', '" + appId2 + "') was successful.")), (String)"The internal topics could not be deleted as expected");
        Assertions.assertEquals((int)2, (int)allGrpsRes.size());
        Assertions.assertNull(allGrpsRes.get(appId2));
        Assertions.assertNull(allGrpsRes.get(appId3));
        Assertions.assertTrue((boolean)service.retrieveInternalTopics(List.of(appId1, appId2, appId3)).isEmpty());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteAllGroupsAfterVersionDowngrade() throws Exception {
        String appId = this.generateGroupAppId();
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete", "--all-groups"};
        StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
        try (KafkaStreams streams = this.startKSApp(appId, service);){
            this.stopKSApp(appId, streams, service);
            this.updateStreamsGroupProtocol((short)0);
            HashMap result = new HashMap();
            String output = ToolsTestUtils.grabConsoleOutput(() -> result.putAll(service.deleteGroups()));
            Assertions.assertTrue((boolean)output.contains("Deletion of requested streams groups ('" + appId + "') was successful."), (String)"The streams group could not be deleted as expected");
            Assertions.assertTrue((boolean)output.contains("Retrieving internal topics is not supported by the broker version. Use 'kafka-topics.sh' to list and delete the group's internal topics."));
            Assertions.assertEquals((int)1, (int)result.size());
            Assertions.assertTrue((boolean)result.containsKey(appId));
            Assertions.assertNull(result.get(appId), (String)"The streams group could not be deleted as expected");
            Assertions.assertTrue((boolean)service.retrieveInternalTopics(List.of(appId)).isEmpty());
        }
        finally {
            this.updateStreamsGroupProtocol((short)1);
        }
    }

    private void updateStreamsGroupProtocol(short version) {
        try (Admin admin = cluster.createAdminClient();){
            Map updates = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"streams.version", (Object)new FeatureUpdate(version, version == 0 ? FeatureUpdate.UpgradeType.SAFE_DOWNGRADE : FeatureUpdate.UpgradeType.UPGRADE))});
            admin.updateFeatures(updates, new UpdateFeaturesOptions()).all().get();
        }
        catch (InterruptedException | ExecutionException e) {
            throw new RuntimeException(e);
        }
    }

    private static Properties createStreamsConfig(String bootstrapServers, String appId) {
        Properties streamsConfig = new Properties();
        streamsConfig.put("auto.offset.reset", "earliest");
        streamsConfig.put("bootstrap.servers", bootstrapServers);
        streamsConfig.put("default.key.serde", Serdes.StringSerde.class);
        streamsConfig.put("default.value.serde", Serdes.StringSerde.class);
        streamsConfig.put("group.protocol", GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
        streamsConfig.put("processing.guarantee", "exactly_once_v2");
        streamsConfig.put("application.id", appId);
        return streamsConfig;
    }

    private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
        StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs((String[])args);
        return new StreamsGroupCommand.StreamsGroupService(opts, Map.of("retries", Integer.toString(Integer.MAX_VALUE)));
    }

    private KafkaStreams startKSApp(String appId, StreamsGroupCommand.StreamsGroupService service) throws Exception {
        String inputTopic = this.generateRandomTopicId(INPUT_TOPIC_PREFIX);
        String outputTopic = this.generateRandomTopicId(OUTPUT_TOPIC_PREFIX);
        StreamsBuilder builder = DeleteStreamsGroupTest.builder(inputTopic, outputTopic);
        DeleteStreamsGroupTest.produceMessages(inputTopic);
        KStream inputStream = builder.stream(inputTopic);
        AtomicInteger recordCount = new AtomicInteger(0);
        KTable valueCounts = inputStream.groupByKey().aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as((String)"aggregated_value"));
        valueCounts.toStream().peek((key, value) -> {
            if (recordCount.incrementAndGet() > 10) {
                throw new IllegalStateException("Crash on the 10 record");
            }
        });
        KafkaStreams streams = IntegrationTestUtils.getStartedStreams((Properties)DeleteStreamsGroupTest.createStreamsConfig(bootstrapServers, appId), (StreamsBuilder)builder, (boolean)true);
        TestUtils.waitForCondition(() -> !service.collectGroupMembers(appId).isEmpty(), (String)"The group did not initialize as expected.");
        TestUtils.waitForCondition(() -> this.checkGroupState(service, appId, GroupState.STABLE), (String)"The group did not become stable as expected.");
        TestUtils.waitForCondition(() -> recordCount.get() == 10, (String)("Expected 10 records processed but only got " + recordCount.get()));
        return streams;
    }

    private void stopKSApp(String appId, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
        if (streams != null) {
            KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
            closeOptions.timeout(Duration.ofSeconds(30L));
            closeOptions.leaveGroup(true);
            streams.close(closeOptions);
            streams.cleanUp();
            TestUtils.waitForCondition(() -> this.checkGroupState(service, appId, GroupState.EMPTY), (String)"The group did not become empty as expected.");
            TestUtils.waitForCondition(() -> service.collectGroupMembers(appId).isEmpty(), (String)"The group size is not zero as expected.");
        }
    }

    private String generateRandomTopicId(String prefix) {
        return prefix + TestUtils.randomString((int)10);
    }

    private String generateGroupAppId() {
        return APP_ID_PREFIX + TestUtils.randomString((int)10);
    }

    private boolean checkGroupState(StreamsGroupCommand.StreamsGroupService service, String groupId, GroupState state) throws Exception {
        return Objects.equals(service.collectGroupState(groupId), state);
    }

    private static void produceMessages(String topic) {
        ArrayList<KeyValueTimestamp> data = new ArrayList<KeyValueTimestamp>(10);
        for (long v = 0L; v < 10L; ++v) {
            data.add(new KeyValueTimestamp((Object)(v + "0" + topic), (Object)(v + "0"), DeleteStreamsGroupTest.cluster.time.milliseconds()));
        }
        IntegrationTestUtils.produceSynchronously((Properties)TestUtils.producerConfig((String)bootstrapServers, StringSerializer.class, StringSerializer.class), (boolean)false, (String)topic, Optional.empty(), data);
    }

    private static StreamsBuilder builder(String inputTopic, String outputTopic) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))).groupBy((key, value) -> value).count().toStream().to(outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        return builder;
    }
}

