/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.streams;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.GroupListing;
import org.apache.kafka.clients.admin.ListGroupsOptions;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.streams.StreamsGroupCommand;
import org.apache.kafka.tools.streams.StreamsGroupCommandOptions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

@Timeout(value=600L)
@Tag(value="integration")
public class DeleteStreamsGroupOffsetTest {
    private static final String TOPIC_PREFIX = "foo-";
    private static final String APP_ID_PREFIX = "streams-group-command-test";
    private static final int RECORD_TOTAL = 5;
    public static EmbeddedKafkaCluster cluster;
    private static String bootstrapServers;
    private static final String OUTPUT_TOPIC_PREFIX = "output-topic-";

    @BeforeAll
    public static void startCluster() {
        Properties props = new Properties();
        cluster = new EmbeddedKafkaCluster(2, props);
        cluster.start();
        bootstrapServers = cluster.bootstrapServers();
    }

    @AfterEach
    public void deleteTopics() {
        block9: {
            try (Admin adminClient2 = cluster.createAdminClient();){
                Set topics = (Set)adminClient2.listTopics().names().get();
                adminClient2.deleteTopics((Collection)topics).all().get();
                List<String> groupIds = ((Collection)adminClient2.listGroups((ListGroupsOptions)ListGroupsOptions.forStreamsGroups().timeoutMs(Integer.valueOf(1000))).all().get()).stream().map(GroupListing::groupId).toList();
                adminClient2.deleteStreamsGroups(groupIds).all().get();
            }
            catch (UnknownTopicOrPartitionException adminClient2) {
            }
            catch (InterruptedException | ExecutionException e) {
                if (e.getCause() instanceof UnknownTopicOrPartitionException) break block9;
                throw new RuntimeException(e);
            }
        }
    }

    private Properties createStreamsConfig(String bootstrapServers, String appId) {
        Properties configs = new Properties();
        configs.put("auto.offset.reset", "earliest");
        configs.put("bootstrap.servers", bootstrapServers);
        configs.put("default.key.serde", Serdes.StringSerde.class);
        configs.put("default.value.serde", Serdes.StringSerde.class);
        configs.put("group.protocol", GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
        configs.put("processing.guarantee", "exactly_once_v2");
        configs.put("application.id", appId);
        return configs;
    }

    @AfterAll
    public static void closeCluster() {
        cluster.stop();
    }

    @Test
    public void testDeleteOffsetsNonExistingGroup() {
        String group = "not-existing";
        String topic = "foo:1";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);){
            Map.Entry res = service.deleteOffsets();
            Assertions.assertEquals((Object)Errors.GROUP_ID_NOT_FOUND, res.getKey());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDeleteStreamsGroupOffsetsMultipleGroups() {
        String group1 = this.generateRandomAppId();
        String group2 = this.generateRandomAppId();
        String topic1 = this.generateRandomTopic();
        String topic2 = this.generateRandomTopic();
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group1, "--group", group2, "--input-topic", topic1, "--input-topic", topic2};
        AtomicBoolean exited = new AtomicBoolean(false);
        Exit.setExitProcedure((statusCode, message) -> {
            Assertions.assertNotEquals((int)0, (int)statusCode);
            Assertions.assertTrue((message.contains("Option [delete-offsets] supports only one [group] at a time, but found:") && message.contains(group1) && message.contains(group2) ? 1 : 0) != 0);
            exited.set(true);
        });
        try {
            this.getStreamsGroupService(args);
        }
        finally {
            Assertions.assertTrue((boolean)exited.get());
        }
    }

    @Test
    public void testDeleteOffsetsOfStableStreamsGroupWithTopicPartition() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String topicPartition = topic + ":0";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topicPartition};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, topic, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfStableStreamsGroupWithTopicOnly() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, topic, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicPartition() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String unknownTopic = "unknown-topic";
        String unknownTopicPartition = "unknown-topic:0";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", "unknown-topic:0"};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, "unknown-topic", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfStableStreamsGroupWithUnknownTopicOnly() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String unknownTopic = "unknown-topic";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", "unknown-topic"};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, "unknown-topic", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicPartition() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String topicPartition = topic + ":0";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topicPartition};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            this.stopKSApp(group, topic, streams, service);
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, topic, 0, 0, Errors.NONE);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfEmptyStreamsGroupWithTopicOnly() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            this.stopKSApp(group, topic, streams, service);
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, topic, -1, 0, Errors.NONE);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfEmptyStreamsGroupWithMultipleTopics() {
        String group = this.generateRandomAppId();
        String topic1 = this.generateRandomTopic();
        String unknownTopic = "unknown-topic";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", topic1, "--input-topic", "unknown-topic"};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic1, service);){
            this.stopKSApp(group, topic1, streams, service);
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, topic1, -1, 0, Errors.NONE);
            this.assertError(res, "unknown-topic", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicPartition() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String unknownTopic = "unknown-topic";
        String unknownTopicPartition = "unknown-topic:0";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", "unknown-topic:0"};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            this.stopKSApp(group, topic, streams, service);
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, "unknown-topic", 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfEmptyStreamsGroupWithUnknownTopicOnly() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String unknownTopic = "unknown-topic";
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--input-topic", "unknown-topic"};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            this.stopKSApp(group, topic, streams, service);
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, "unknown-topic", -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void testDeleteOffsetsOfEmptyStreamsGroupWithAllTopics() {
        String group = this.generateRandomAppId();
        String topic = this.generateRandomTopic();
        String[] args = new String[]{"--bootstrap-server", bootstrapServers, "--delete-offsets", "--group", group, "--all-input-topics", topic};
        try (StreamsGroupCommand.StreamsGroupService service = this.getStreamsGroupService(args);
             KafkaStreams streams = this.startKSApp(group, topic, service);){
            this.stopKSApp(group, topic, streams, service);
            Map.Entry res = service.deleteOffsets();
            this.assertError(res, topic, -1, 0, Errors.NONE);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void assertError(Map.Entry<Errors, Map<TopicPartition, Throwable>> res, String inputTopic, int inputPartition, int expectedPartition, Errors expectedError) {
        Errors topLevelError = res.getKey();
        Map<TopicPartition, Throwable> partitions = res.getValue();
        TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
        if (inputPartition >= 0) {
            Assertions.assertEquals((Object)expectedError, (Object)topLevelError);
        }
        if (expectedError == Errors.NONE) {
            Assertions.assertNull((Object)partitions.get(tp));
        } else {
            Assertions.assertEquals((Object)expectedError.exception(), (Object)partitions.get(tp).getCause());
        }
    }

    private String generateRandomTopic() {
        return TOPIC_PREFIX + TestUtils.randomString((int)10);
    }

    private String generateRandomAppId() {
        return APP_ID_PREFIX + TestUtils.randomString((int)10);
    }

    private void stopKSApp(String appId, String topic, KafkaStreams streams, StreamsGroupCommand.StreamsGroupService service) throws InterruptedException {
        if (streams != null) {
            KafkaStreams.CloseOptions closeOptions = new KafkaStreams.CloseOptions();
            closeOptions.timeout(Duration.ofSeconds(30L));
            closeOptions.leaveGroup(true);
            streams.close(closeOptions);
            streams.cleanUp();
            TestUtils.waitForCondition(() -> this.checkGroupState(service, appId, GroupState.EMPTY), (String)"The group did not become empty as expected.");
            TestUtils.waitForCondition(() -> service.collectGroupMembers(appId).isEmpty(), (String)"The group size is not zero as expected.");
        }
    }

    private KafkaStreams startKSApp(String appId, String inputTopic, StreamsGroupCommand.StreamsGroupService service) throws Exception {
        String outputTopic = this.generateRandomTopicId(OUTPUT_TOPIC_PREFIX);
        StreamsBuilder builder = DeleteStreamsGroupOffsetTest.builder(inputTopic, outputTopic);
        DeleteStreamsGroupOffsetTest.produceMessages(inputTopic);
        KStream inputStream = builder.stream(inputTopic);
        AtomicInteger recordCount = new AtomicInteger(0);
        KTable valueCounts = inputStream.groupByKey().aggregate(() -> "()", (key, value, aggregate) -> aggregate + ",(" + key + ": " + value + ")", Materialized.as((String)"aggregated_value"));
        valueCounts.toStream().peek((key, value) -> {
            if (recordCount.incrementAndGet() > 5) {
                throw new IllegalStateException("Crash on the 5 record");
            }
        });
        KafkaStreams streams = IntegrationTestUtils.getStartedStreams((Properties)this.createStreamsConfig(bootstrapServers, appId), (StreamsBuilder)builder, (boolean)true);
        TestUtils.waitForCondition(() -> !service.collectGroupMembers(appId).isEmpty(), (String)"The group did not initialize as expected.");
        TestUtils.waitForCondition(() -> this.checkGroupState(service, appId, GroupState.STABLE), (String)"The group did not become stable as expected.");
        return streams;
    }

    private String generateRandomTopicId(String prefix) {
        return prefix + TestUtils.randomString((int)10);
    }

    private String generateGroupAppId() {
        return APP_ID_PREFIX + TestUtils.randomString((int)10);
    }

    private boolean checkGroupState(StreamsGroupCommand.StreamsGroupService service, String groupId, GroupState state) throws Exception {
        return Objects.equals(service.collectGroupState(groupId), state);
    }

    private static StreamsBuilder builder(String inputTopic, String outputTopic) {
        StreamsBuilder builder = new StreamsBuilder();
        builder.stream(inputTopic, Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String())).flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"))).groupBy((key, value) -> value).count().toStream().to(outputTopic, Produced.with((Serde)Serdes.String(), (Serde)Serdes.Long()));
        return builder;
    }

    private static void produceMessages(String topic) {
        ArrayList<KeyValueTimestamp> data = new ArrayList<KeyValueTimestamp>(5);
        for (long v = 0L; v < 5L; ++v) {
            data.add(new KeyValueTimestamp((Object)(v + "0" + topic), (Object)(v + "0"), DeleteStreamsGroupOffsetTest.cluster.time.milliseconds()));
        }
        IntegrationTestUtils.produceSynchronously((Properties)TestUtils.producerConfig((String)bootstrapServers, StringSerializer.class, StringSerializer.class), (boolean)false, (String)topic, Optional.empty(), data);
    }

    private StreamsGroupCommand.StreamsGroupService getStreamsGroupService(String[] args) {
        StreamsGroupCommandOptions opts = StreamsGroupCommandOptions.fromArgs((String[])args);
        return new StreamsGroupCommand.StreamsGroupService(opts, cluster.createAdminClient());
    }
}

