/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.consumer.group;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommand;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandOptions;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(types={Type.CO_KRAFT}, serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="group.initial.rebalance.delay.ms", value="1000"), @ClusterConfigProperty(key="group.consumer.heartbeat.interval.ms", value="500"), @ClusterConfigProperty(key="group.consumer.min.heartbeat.interval.ms", value="500")})
public class DeleteOffsetsConsumerGroupCommandIntegrationTest {
    public static final String TOPIC_PREFIX = "foo.";
    public static final String GROUP_PREFIX = "test.group.";
    private final ClusterInstance clusterInstance;

    DeleteOffsetsConsumerGroupCommandIntegrationTest(ClusterInstance clusterInstance) {
        this.clusterInstance = clusterInstance;
    }

    @ClusterTest
    public void testDeleteOffsetsNonExistingGroup() {
        String group = "missing.group";
        String topic = "foo:1";
        try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = DeleteOffsetsConsumerGroupCommandIntegrationTest.consumerGroupService(this.getArgs(group, topic));){
            Map.Entry res = consumerGroupService.deleteOffsets(group, Collections.singletonList(topic));
            Assertions.assertEquals((Object)Errors.GROUP_ID_NOT_FOUND, res.getKey());
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfStableConsumerGroupWithTopicPartition() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            Runnable validateRunnable = this.getValidateRunnable(topic, group, 0, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
            this.testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
            this.removeTopic(topic);
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfStableConsumerGroupWithTopicOnly() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            Runnable validateRunnable = this.getValidateRunnable(topic, group, -1, 0, Errors.GROUP_SUBSCRIBED_TO_TOPIC);
            this.testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
            this.removeTopic(topic);
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicPartition() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            Runnable validateRunnable = this.getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
            this.testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfStableConsumerGroupWithUnknownTopicOnly() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            Runnable validateRunnable = this.getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
            this.testWithConsumerGroup(topic, group, groupProtocol, true, validateRunnable);
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicPartition() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            Runnable validateRunnable = this.getValidateRunnable(topic, group, 0, 0, Errors.NONE);
            this.testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
            this.removeTopic(topic);
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfEmptyConsumerGroupWithTopicOnly() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            this.createTopic(topic);
            Runnable validateRunnable = this.getValidateRunnable(topic, group, -1, 0, Errors.NONE);
            this.testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
            this.removeTopic(topic);
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicPartition() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            Runnable validateRunnable = this.getValidateRunnable("foobar", group, 0, 0, Errors.UNKNOWN_TOPIC_OR_PARTITION);
            this.testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
        }
    }

    @ClusterTest
    public void testDeleteOffsetsOfEmptyConsumerGroupWithUnknownTopicOnly() {
        for (GroupProtocol groupProtocol : this.clusterInstance.supportedGroupProtocols()) {
            String topic = TOPIC_PREFIX + groupProtocol.name();
            String group = GROUP_PREFIX + groupProtocol.name();
            Runnable validateRunnable = this.getValidateRunnable("foobar", group, -1, -1, Errors.UNKNOWN_TOPIC_OR_PARTITION);
            this.testWithConsumerGroup(topic, group, groupProtocol, false, validateRunnable);
        }
    }

    private String[] getArgs(String group, String topic) {
        return new String[]{"--bootstrap-server", this.clusterInstance.bootstrapServers(), "--delete-offsets", "--group", group, "--topic", topic};
    }

    private static ConsumerGroupCommand.ConsumerGroupService consumerGroupService(String[] args) {
        return new ConsumerGroupCommand.ConsumerGroupService(ConsumerGroupCommandOptions.fromArgs((String[])args), Collections.singletonMap("retries", Integer.toString(Integer.MAX_VALUE)));
    }

    private Runnable getValidateRunnable(String inputTopic, String inputGroup, int inputPartition, int expectedPartition, Errors expectedError) {
        return () -> {
            String topic = inputPartition >= 0 ? inputTopic + ":" + inputPartition : inputTopic;
            try (ConsumerGroupCommand.ConsumerGroupService consumerGroupService = DeleteOffsetsConsumerGroupCommandIntegrationTest.consumerGroupService(this.getArgs(inputGroup, topic));){
                Map.Entry res = consumerGroupService.deleteOffsets(inputGroup, Collections.singletonList(topic));
                Errors topLevelError = (Errors)res.getKey();
                Map partitions = (Map)res.getValue();
                TopicPartition tp = new TopicPartition(inputTopic, expectedPartition);
                if (inputPartition >= 0) {
                    Assertions.assertEquals((Object)expectedError, (Object)topLevelError);
                }
                if (expectedError == Errors.NONE) {
                    Assertions.assertNull(partitions.get(tp));
                } else {
                    Assertions.assertEquals((Object)expectedError.exception(), (Object)((Throwable)partitions.get(tp)).getCause());
                }
            }
        };
    }

    private void testWithConsumerGroup(String inputTopic, String inputGroup, GroupProtocol groupProtocol, boolean isStable, Runnable validateRunnable) {
        this.produceRecord(inputTopic);
        try (Consumer<byte[], byte[]> consumer = this.createConsumer(inputGroup, groupProtocol);){
            consumer.subscribe(Collections.singletonList(inputTopic));
            ConsumerRecords records = consumer.poll(Duration.ofMillis(15000L));
            Assertions.assertNotEquals((int)0, (int)records.count());
            consumer.commitSync();
            if (isStable) {
                validateRunnable.run();
            }
        }
        if (!isStable) {
            validateRunnable.run();
        }
    }

    private void produceRecord(String topic) {
        try (Producer<byte[], byte[]> producer = this.createProducer();){
            Assertions.assertDoesNotThrow(() -> (RecordMetadata)producer.send(new ProducerRecord(topic, Integer.valueOf(0), null, null)).get());
        }
    }

    private Producer<byte[], byte[]> createProducer() {
        return this.clusterInstance.producer(Map.of("acks", "-1"));
    }

    private Consumer<byte[], byte[]> createConsumer(String group, GroupProtocol groupProtocol) {
        HashMap<String, String> consumerConfig = new HashMap<String, String>();
        consumerConfig.putIfAbsent("bootstrap.servers", this.clusterInstance.bootstrapServers());
        consumerConfig.putIfAbsent("group.protocol", groupProtocol.name());
        consumerConfig.putIfAbsent("auto.offset.reset", "earliest");
        consumerConfig.putIfAbsent("group.id", group);
        consumerConfig.putIfAbsent("key.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfig.putIfAbsent("value.deserializer", ByteArrayDeserializer.class.getName());
        consumerConfig.putIfAbsent("max.poll.interval.ms", Integer.toString(Integer.MAX_VALUE));
        if (groupProtocol == GroupProtocol.CLASSIC) {
            consumerConfig.putIfAbsent("session.timeout.ms", Integer.toString(1800000));
        }
        return new KafkaConsumer(consumerConfig);
    }

    private void createTopic(String topic) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> (Uuid)admin.createTopics(Collections.singletonList(new NewTopic(topic, 1, 1))).topicId(topic).get());
        }
    }

    private void removeTopic(String topic) {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", this.clusterInstance.bootstrapServers()));){
            Assertions.assertDoesNotThrow(() -> admin.deleteTopics(Collections.singletonList(topic)).all());
        }
    }
}

