/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.tools.consumer.group;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import joptsimple.OptionException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.RangeAssignor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.GroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
import org.apache.kafka.common.test.api.Type;
import org.apache.kafka.test.TestUtils;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommand;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandOptions;
import org.apache.kafka.tools.consumer.group.ConsumerGroupCommandTestUtils;
import org.junit.jupiter.api.Assertions;

@ClusterTestDefaults(types={Type.CO_KRAFT}, serverProperties={@ClusterConfigProperty(key="offsets.topic.num.partitions", value="1"), @ClusterConfigProperty(key="offsets.topic.replication.factor", value="1"), @ClusterConfigProperty(key="group.initial.rebalance.delay.ms", value="1000"), @ClusterConfigProperty(key="group.consumer.heartbeat.interval.ms", value="500"), @ClusterConfigProperty(key="group.consumer.min.heartbeat.interval.ms", value="500")})
public class ResetConsumerGroupOffsetTest {
    private static final String TOPIC_PREFIX = "foo-";
    private static final String GROUP_PREFIX = "test.group-";

    private String[] basicArgs(ClusterInstance cluster) {
        return new String[]{"--reset-offsets", "--bootstrap-server", cluster.bootstrapServers(), "--timeout", Long.toString(15000L)};
    }

    private String[] buildArgsForGroups(ClusterInstance cluster, List<String> groups, String ... args) {
        ArrayList<String> res = new ArrayList<String>(Arrays.asList(this.basicArgs(cluster)));
        for (String group : groups) {
            res.add("--group");
            res.add(group);
        }
        res.addAll(Arrays.asList(args));
        return res.toArray(new String[0]);
    }

    private String[] buildArgsForGroup(ClusterInstance cluster, String group, String ... args) {
        return this.buildArgsForGroups(cluster, Collections.singletonList(group), args);
    }

    private String[] buildArgsForAllGroups(ClusterInstance cluster, String ... args) {
        ArrayList<String> res = new ArrayList<String>(Arrays.asList(this.basicArgs(cluster)));
        res.add("--all-groups");
        res.addAll(Arrays.asList(args));
        return res.toArray(new String[0]);
    }

    @ClusterTest
    public void testResetOffsetsNotExistingGroup(ClusterInstance cluster) throws Exception {
        String topic = this.generateRandomTopic();
        String group = "missing.group";
        String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute");
        try (ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(args);){
            Map resetOffsets = (Map)service.resetOffsets().get(group);
            Assertions.assertTrue((boolean)resetOffsets.isEmpty());
            Assertions.assertTrue((boolean)this.committedOffsets(cluster, topic, group).isEmpty());
        }
    }

    @ClusterTest
    public void testResetOffsetsExistingTopic(ClusterInstance cluster) {
        String topic = this.generateRandomTopic();
        String group = "new.group";
        String[] args = this.buildArgsForGroup(cluster, group, "--topic", topic, "--to-offset", "50");
        this.produceMessages(cluster, topic, 100);
        this.resetAndAssertOffsets(cluster, args, 50L, true, Collections.singletonList(topic));
        this.resetAndAssertOffsets(cluster, this.addTo(args, "--dry-run"), 50L, true, Collections.singletonList(topic));
        this.resetAndAssertOffsets(cluster, this.addTo(args, "--execute"), 50L, false, Collections.singletonList(topic));
    }

    @ClusterTest
    public void testResetOffsetsExistingTopicSelectedGroups(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String topic = this.generateRandomTopic();
            this.produceMessages(cluster, topic, 100);
            List<String> groups = ResetConsumerGroupOffsetTest.generateIds(topic);
            for (String group : groups) {
                AutoCloseable consumerGroupCloseable = this.consumerGroupClosable(cluster, 1, topic, group, groupProtocol);
                try {
                    this.awaitConsumerProgress(cluster, topic, group, 100L);
                }
                finally {
                    if (consumerGroupCloseable == null) continue;
                    consumerGroupCloseable.close();
                }
            }
            String[] args = this.buildArgsForGroups(cluster, groups, "--topic", topic, "--to-offset", "50");
            this.resetAndAssertOffsets(cluster, args, 50L, true, Collections.singletonList(topic));
            this.resetAndAssertOffsets(cluster, this.addTo(args, "--dry-run"), 50L, true, Collections.singletonList(topic));
            this.resetAndAssertOffsets(cluster, this.addTo(args, "--execute"), 50L, false, Collections.singletonList(topic));
        }
    }

    @ClusterTest
    public void testResetOffsetsExistingTopicAllGroups(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForAllGroups(cluster, "--topic", topic, "--to-offset", "50");
            this.produceMessages(cluster, topic, 100);
            for (int i = 1; i <= 3; ++i) {
                String group = this.generateRandomGroupId();
                try (AutoCloseable consumerGroupCloseable = this.consumerGroupClosable(cluster, 1, topic, group, groupProtocol);){
                    this.awaitConsumerProgress(cluster, topic, group, 100L);
                    continue;
                }
            }
            this.resetAndAssertOffsets(cluster, args, 50L, true, Collections.singletonList(topic));
            this.resetAndAssertOffsets(cluster, this.addTo(args, "--dry-run"), 50L, true, Collections.singletonList(topic));
            this.resetAndAssertOffsets(cluster, this.addTo(args, "--execute"), 50L, false, Collections.singletonList(topic));
        }
    }

    @ClusterTest
    public void testResetOffsetsAllTopicsAllGroups(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String groupId = this.generateRandomGroupId();
            String topicId = this.generateRandomTopic();
            String[] args = this.buildArgsForAllGroups(cluster, "--all-topics", "--to-offset", "50");
            List<String> topics = ResetConsumerGroupOffsetTest.generateIds(groupId);
            List<String> groups = ResetConsumerGroupOffsetTest.generateIds(topicId);
            topics.forEach(topic -> this.produceMessages(cluster, (String)topic, 100));
            for (String topic2 : topics) {
                for (String group : groups) {
                    AutoCloseable consumerGroupCloseable = this.consumerGroupClosable(cluster, 3, topic2, group, groupProtocol);
                    try {
                        this.awaitConsumerProgress(cluster, topic2, group, 100L);
                    }
                    finally {
                        if (consumerGroupCloseable == null) continue;
                        consumerGroupCloseable.close();
                    }
                }
            }
            this.resetAndAssertOffsets(cluster, args, 50L, true, topics);
            this.resetAndAssertOffsets(cluster, this.addTo(args, "--dry-run"), 50L, true, topics);
            this.resetAndAssertOffsets(cluster, this.addTo(args, "--execute"), 50L, false, topics);
            Admin admin = cluster.admin();
            try {
                admin.deleteConsumerGroups(groups).all().get();
            }
            finally {
                if (admin == null) continue;
                admin.close();
            }
        }
    }

    @ClusterTest
    public void testResetOffsetsToLocalDateTime(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS");
            LocalDateTime dateTime = LocalDateTime.now().minusDays(1L);
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-datetime", format.format(dateTime), "--execute");
            this.produceMessages(cluster, topic, 100);
            try (AutoCloseable consumerGroupCloseable = this.consumerGroupClosable(cluster, 1, topic, group, groupProtocol);){
                this.awaitConsumerProgress(cluster, topic, group, 100L);
            }
            this.resetAndAssertOffsets(cluster, topic, args, 0L);
        }
    }

    @ClusterTest
    public void testResetOffsetsToZonedDateTime(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            DateTimeFormatter format = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSSXXX");
            this.produceMessages(cluster, topic, 50);
            ZonedDateTime checkpoint = LocalDateTime.now().atZone(ZoneId.systemDefault());
            this.produceMessages(cluster, topic, 50);
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-datetime", format.format(checkpoint), "--execute");
            try (AutoCloseable consumerGroupCloseable = this.consumerGroupClosable(cluster, 1, topic, group, groupProtocol);){
                this.awaitConsumerProgress(cluster, topic, group, 100L);
            }
            this.resetAndAssertOffsets(cluster, topic, args, 50L);
        }
    }

    @ClusterTest
    public void testResetOffsetsByDuration(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--by-duration", "PT1M", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.resetAndAssertOffsets(cluster, topic, args, 0L);
        }
    }

    @ClusterTest
    public void testResetOffsetsByDurationToEarliest(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--by-duration", "PT0.1S", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.resetAndAssertOffsets(cluster, topic, args, 100L);
        }
    }

    @ClusterTest
    public void testResetOffsetsByDurationFallbackToLatestWhenNoRecords(ClusterInstance cluster) throws ExecutionException, InterruptedException {
        String group = this.generateRandomGroupId();
        String topic = this.generateRandomTopic();
        String[] args = this.buildArgsForGroup(cluster, group, "--topic", topic, "--by-duration", "PT1M", "--execute");
        try (Admin admin = cluster.admin();){
            admin.createTopics(Collections.singleton(new NewTopic(topic, 1, 1))).all().get();
            this.resetAndAssertOffsets(cluster, args, 0L, false, Collections.singletonList(topic));
            admin.deleteTopics(Collections.singleton(topic)).all().get();
        }
    }

    @ClusterTest
    public void testResetOffsetsToEarliest(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-earliest", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.resetAndAssertOffsets(cluster, topic, args, 0L);
        }
    }

    @ClusterTest
    public void testResetOffsetsToLatest(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-latest", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.produceMessages(cluster, topic, 100);
            this.resetAndAssertOffsets(cluster, topic, args, 200L);
        }
    }

    @ClusterTest
    public void testResetOffsetsToCurrentOffset(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-current", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.produceMessages(cluster, topic, 100);
            this.resetAndAssertOffsets(cluster, topic, args, 100L);
        }
    }

    @ClusterTest
    public void testResetOffsetsToSpecificOffset(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-offset", "1", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.resetAndAssertOffsets(cluster, topic, args, 1L);
        }
    }

    @ClusterTest
    public void testResetOffsetsShiftPlus(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "50", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.produceMessages(cluster, topic, 100);
            this.resetAndAssertOffsets(cluster, topic, args, 150L);
        }
    }

    @ClusterTest
    public void testResetOffsetsShiftMinus(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "-50", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.produceMessages(cluster, topic, 100);
            this.resetAndAssertOffsets(cluster, topic, args, 50L);
        }
    }

    @ClusterTest
    public void testResetOffsetsShiftByLowerThanEarliest(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "-150", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.produceMessages(cluster, topic, 100);
            this.resetAndAssertOffsets(cluster, topic, args, 0L);
        }
    }

    @ClusterTest
    public void testResetOffsetsShiftByHigherThanLatest(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--all-topics", "--shift-by", "150", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.produceMessages(cluster, topic, 100);
            this.resetAndAssertOffsets(cluster, topic, args, 200L);
        }
    }

    @ClusterTest
    public void testResetOffsetsToEarliestOnOneTopic(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--topic", topic, "--to-earliest", "--execute");
            this.produceConsumeAndShutdown(cluster, topic, group, 1, groupProtocol);
            this.resetAndAssertOffsets(cluster, topic, args, 0L);
        }
    }

    @ClusterTest
    public void testResetOffsetsToEarliestOnOneTopicAndPartition(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--topic", topic + ":1", "--to-earliest", "--execute");
            Admin admin = cluster.admin();
            try {
                ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(args);
                try {
                    admin.createTopics(Collections.singleton(new NewTopic(topic, 2, 1))).all().get();
                    this.produceConsumeAndShutdown(cluster, topic, group, 2, groupProtocol);
                    Map<TopicPartition, Long> priorCommittedOffsets = this.committedOffsets(cluster, topic, group);
                    TopicPartition tp0 = new TopicPartition(topic, 0);
                    TopicPartition tp1 = new TopicPartition(topic, 1);
                    HashMap<TopicPartition, Long> expectedOffsets = new HashMap<TopicPartition, Long>();
                    expectedOffsets.put(tp0, priorCommittedOffsets.get(tp0));
                    expectedOffsets.put(tp1, 0L);
                    this.resetAndAssertOffsetsCommitted(cluster, service, expectedOffsets, topic);
                    admin.deleteTopics(Collections.singleton(topic)).all().get();
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (admin == null) continue;
                admin.close();
            }
        }
    }

    @ClusterTest
    public void testResetOffsetsToEarliestOnTopics(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic1 = this.generateRandomTopic();
            String topic2 = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--topic", topic1, "--topic", topic2, "--to-earliest", "--execute");
            Admin admin = cluster.admin();
            try {
                ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(args);
                try {
                    admin.createTopics(Arrays.asList(new NewTopic(topic1, 1, 1), new NewTopic(topic2, 1, 1))).all().get();
                    this.produceConsumeAndShutdown(cluster, topic1, group, 1, groupProtocol);
                    this.produceConsumeAndShutdown(cluster, topic2, group, 1, groupProtocol);
                    TopicPartition tp1 = new TopicPartition(topic1, 0);
                    TopicPartition tp2 = new TopicPartition(topic2, 0);
                    Map<TopicPartition, Long> allResetOffsets = this.toOffsetMap(this.resetOffsets(service).get(group));
                    HashMap<TopicPartition, Long> expMap = new HashMap<TopicPartition, Long>();
                    expMap.put(tp1, 0L);
                    expMap.put(tp2, 0L);
                    Assertions.assertEquals(expMap, allResetOffsets);
                    Assertions.assertEquals(Collections.singletonMap(tp1, 0L), this.committedOffsets(cluster, topic1, group));
                    Assertions.assertEquals(Collections.singletonMap(tp2, 0L), this.committedOffsets(cluster, topic2, group));
                    admin.deleteTopics(Arrays.asList(topic1, topic2)).all().get();
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (admin == null) continue;
                admin.close();
            }
        }
    }

    @ClusterTest
    public void testResetOffsetsToEarliestOnTopicsAndPartitions(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic1 = this.generateRandomTopic();
            String topic2 = this.generateRandomTopic();
            String[] args = this.buildArgsForGroup(cluster, group, "--topic", topic1 + ":1", "--topic", topic2 + ":1", "--to-earliest", "--execute");
            Admin admin = cluster.admin();
            try {
                ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(args);
                try {
                    admin.createTopics(Arrays.asList(new NewTopic(topic1, 2, 1), new NewTopic(topic2, 2, 1))).all().get();
                    this.produceConsumeAndShutdown(cluster, topic1, group, 2, groupProtocol);
                    this.produceConsumeAndShutdown(cluster, topic2, group, 2, groupProtocol);
                    Map<TopicPartition, Long> priorCommittedOffsets1 = this.committedOffsets(cluster, topic1, group);
                    Map<TopicPartition, Long> priorCommittedOffsets2 = this.committedOffsets(cluster, topic2, group);
                    TopicPartition tp1 = new TopicPartition(topic1, 1);
                    TopicPartition tp2 = new TopicPartition(topic2, 1);
                    Map<TopicPartition, Long> allResetOffsets = this.toOffsetMap(this.resetOffsets(service).get(group));
                    HashMap<TopicPartition, Long> expMap = new HashMap<TopicPartition, Long>();
                    expMap.put(tp1, 0L);
                    expMap.put(tp2, 0L);
                    Assertions.assertEquals(expMap, allResetOffsets);
                    priorCommittedOffsets1.put(tp1, 0L);
                    Assertions.assertEquals(priorCommittedOffsets1, this.committedOffsets(cluster, topic1, group));
                    priorCommittedOffsets2.put(tp2, 0L);
                    Assertions.assertEquals(priorCommittedOffsets2, this.committedOffsets(cluster, topic2, group));
                    admin.deleteTopics(Arrays.asList(topic1, topic2)).all().get();
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (admin == null) continue;
                admin.close();
            }
        }
    }

    @ClusterTest
    public void testResetOffsetsExportImportPlanSingleGroupArg(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group = this.generateRandomGroupId();
            String topic = this.generateRandomTopic();
            TopicPartition tp0 = new TopicPartition(topic, 0);
            TopicPartition tp1 = new TopicPartition(topic, 1);
            String[] cgcArgs = this.buildArgsForGroup(cluster, group, "--all-topics", "--to-offset", "2", "--export");
            File file = TestUtils.tempFile((String)"reset", (String)".csv");
            Admin admin = cluster.admin();
            try {
                ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(cgcArgs);
                try {
                    admin.createTopics(Collections.singleton(new NewTopic(topic, 2, 1))).all().get();
                    this.produceConsumeAndShutdown(cluster, topic, group, 2, groupProtocol);
                    Map exportedOffsets = service.resetOffsets();
                    this.writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets));
                    HashMap<TopicPartition, Long> exp1 = new HashMap<TopicPartition, Long>();
                    exp1.put(tp0, 2L);
                    exp1.put(tp1, 2L);
                    Assertions.assertEquals(exp1, this.toOffsetMap((Map)exportedOffsets.get(group)));
                    String[] cgcArgsExec = this.buildArgsForGroup(cluster, group, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
                    try (ConsumerGroupCommand.ConsumerGroupService serviceExec = this.getConsumerGroupService(cgcArgsExec);){
                        Map importedOffsets = serviceExec.resetOffsets();
                        Assertions.assertEquals(exp1, this.toOffsetMap((Map)importedOffsets.get(group)));
                    }
                    admin.deleteTopics(Collections.singleton(topic));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (admin == null) continue;
                admin.close();
            }
        }
    }

    @ClusterTest
    public void testResetOffsetsExportImportPlan(ClusterInstance cluster) throws Exception {
        for (GroupProtocol groupProtocol : cluster.supportedGroupProtocols()) {
            String group1 = this.generateRandomGroupId();
            String group2 = this.generateRandomGroupId();
            String topic1 = this.generateRandomTopic();
            String topic2 = this.generateRandomTopic();
            TopicPartition t1p0 = new TopicPartition(topic1, 0);
            TopicPartition t1p1 = new TopicPartition(topic1, 1);
            TopicPartition t2p0 = new TopicPartition(topic2, 0);
            TopicPartition t2p1 = new TopicPartition(topic2, 1);
            String[] cgcArgs = this.buildArgsForGroups(cluster, Arrays.asList(group1, group2), "--all-topics", "--to-offset", "2", "--export");
            File file = TestUtils.tempFile((String)"reset", (String)".csv");
            Admin admin = cluster.admin();
            try {
                ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(cgcArgs);
                try {
                    admin.createTopics(Arrays.asList(new NewTopic(topic1, 2, 1), new NewTopic(topic2, 2, 1))).all().get();
                    this.produceConsumeAndShutdown(cluster, topic1, group1, 1, groupProtocol);
                    this.produceConsumeAndShutdown(cluster, topic2, group2, 1, groupProtocol);
                    this.awaitConsumerGroupInactive(service, group1);
                    this.awaitConsumerGroupInactive(service, group2);
                    Map exportedOffsets = service.resetOffsets();
                    this.writeContentToFile(file, service.exportOffsetsToCsv(exportedOffsets));
                    HashMap<TopicPartition, Long> exp1 = new HashMap<TopicPartition, Long>();
                    exp1.put(t1p0, 2L);
                    exp1.put(t1p1, 2L);
                    HashMap<TopicPartition, Long> exp2 = new HashMap<TopicPartition, Long>();
                    exp2.put(t2p0, 2L);
                    exp2.put(t2p1, 2L);
                    Assertions.assertEquals(exp1, this.toOffsetMap((Map)exportedOffsets.get(group1)));
                    Assertions.assertEquals(exp2, this.toOffsetMap((Map)exportedOffsets.get(group2)));
                    String[] cgcArgsExec = this.buildArgsForGroups(cluster, Arrays.asList(group1, group2), "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
                    try (ConsumerGroupCommand.ConsumerGroupService serviceExec = this.getConsumerGroupService(cgcArgsExec);){
                        Map importedOffsets = serviceExec.resetOffsets();
                        Assertions.assertEquals(exp1, this.toOffsetMap((Map)importedOffsets.get(group1)));
                        Assertions.assertEquals(exp2, this.toOffsetMap((Map)importedOffsets.get(group2)));
                    }
                    String[] cgcArgsExec2 = this.buildArgsForGroup(cluster, group1, "--all-topics", "--from-file", file.getCanonicalPath(), "--dry-run");
                    try (ConsumerGroupCommand.ConsumerGroupService serviceExec2 = this.getConsumerGroupService(cgcArgsExec2);){
                        Map importedOffsets2 = serviceExec2.resetOffsets();
                        Assertions.assertEquals(exp1, this.toOffsetMap((Map)importedOffsets2.get(group1)));
                    }
                    admin.deleteTopics(Arrays.asList(topic1, topic2));
                }
                finally {
                    if (service == null) continue;
                    service.close();
                }
            }
            finally {
                if (admin == null) continue;
                admin.close();
            }
        }
    }

    @ClusterTest
    public void testResetWithUnrecognizedNewConsumerOption(ClusterInstance cluster) {
        String group = this.generateRandomGroupId();
        String[] cgcArgs = new String[]{"--new-consumer", "--bootstrap-server", cluster.bootstrapServers(), "--reset-offsets", "--group", group, "--all-topics", "--to-offset", "2", "--export"};
        Assertions.assertThrows(OptionException.class, () -> this.getConsumerGroupService(cgcArgs));
    }

    private String generateRandomTopic() {
        return TOPIC_PREFIX + TestUtils.randomString((int)10);
    }

    private String generateRandomGroupId() {
        return GROUP_PREFIX + TestUtils.randomString((int)10);
    }

    private Map<TopicPartition, Long> committedOffsets(ClusterInstance cluster, String topic, String group) {
        Map<TopicPartition, Long> map;
        block8: {
            Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", cluster.bootstrapServers()));
            try {
                map = ((Map)((Map)admin.listConsumerGroupOffsets(group).all().get()).get(group)).entrySet().stream().filter(e -> ((TopicPartition)e.getKey()).topic().equals(topic)).collect(Collectors.toMap(Map.Entry::getKey, e -> ((OffsetAndMetadata)e.getValue()).offset()));
                if (admin == null) break block8;
            }
            catch (Throwable throwable) {
                try {
                    if (admin != null) {
                        try {
                            admin.close();
                        }
                        catch (Throwable throwable2) {
                            throwable.addSuppressed(throwable2);
                        }
                    }
                    throw throwable;
                }
                catch (InterruptedException | ExecutionException e2) {
                    throw new RuntimeException(e2);
                }
            }
            admin.close();
        }
        return map;
    }

    private ConsumerGroupCommand.ConsumerGroupService getConsumerGroupService(String[] args) {
        return new ConsumerGroupCommand.ConsumerGroupService(ConsumerGroupCommandOptions.fromArgs((String[])args), Collections.singletonMap("retries", Integer.toString(Integer.MAX_VALUE)));
    }

    private void produceMessages(ClusterInstance cluster, String topic, int numMessages) {
        List<ProducerRecord<byte[], byte[]>> records = IntStream.range(0, numMessages).mapToObj(i -> new ProducerRecord(topic, (Object)new byte[100000])).collect(Collectors.toList());
        this.produceMessages(cluster, records);
    }

    private void produceMessages(ClusterInstance cluster, List<ProducerRecord<byte[], byte[]>> records) {
        try (Producer<byte[], byte[]> producer = this.createProducer(cluster);){
            records.forEach(arg_0 -> producer.send(arg_0));
        }
    }

    private Producer<byte[], byte[]> createProducer(ClusterInstance cluster) {
        Properties props = new Properties();
        props.put("bootstrap.servers", cluster.bootstrapServers());
        props.put("acks", "1");
        props.put("key.serializer", ByteArraySerializer.class.getName());
        props.put("value.serializer", ByteArraySerializer.class.getName());
        return new KafkaProducer(props);
    }

    private void resetAndAssertOffsets(ClusterInstance cluster, String topic, String[] args, long expectedOffset) {
        this.resetAndAssertOffsets(cluster, args, expectedOffset, false, Collections.singletonList(topic));
    }

    private void resetAndAssertOffsets(ClusterInstance cluster, String[] args, long expectedOffset, boolean dryRun, List<String> topics) {
        try (ConsumerGroupCommand.ConsumerGroupService service = this.getConsumerGroupService(args);){
            Map<String, Map<TopicPartition, Long>> topicToExpectedOffsets = this.getTopicExceptOffsets(topics, expectedOffset);
            Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsetsResultByGroup = this.resetOffsets(service);
            for (String topic : topics) {
                resetOffsetsResultByGroup.forEach((group, partitionInfo) -> {
                    Map<TopicPartition, Long> priorOffsets = this.committedOffsets(cluster, topic, (String)group);
                    Assertions.assertEquals(topicToExpectedOffsets.get(topic), this.partitionToOffsets(topic, (Map<TopicPartition, OffsetAndMetadata>)partitionInfo));
                    Assertions.assertEquals(dryRun ? priorOffsets : topicToExpectedOffsets.get(topic), this.committedOffsets(cluster, topic, (String)group));
                });
            }
        }
    }

    private Map<String, Map<TopicPartition, Long>> getTopicExceptOffsets(List<String> topics, long expectedOffset) {
        return topics.stream().collect(Collectors.toMap(Function.identity(), topic -> Collections.singletonMap(new TopicPartition(topic, 0), expectedOffset)));
    }

    private Map<String, Map<TopicPartition, OffsetAndMetadata>> resetOffsets(ConsumerGroupCommand.ConsumerGroupService consumerGroupService) {
        return consumerGroupService.resetOffsets();
    }

    private Map<TopicPartition, Long> partitionToOffsets(String topic, Map<TopicPartition, OffsetAndMetadata> partitionInfo) {
        return partitionInfo.entrySet().stream().filter(entry -> Objects.equals(((TopicPartition)entry.getKey()).topic(), topic)).collect(Collectors.toMap(Map.Entry::getKey, e -> ((OffsetAndMetadata)e.getValue()).offset()));
    }

    private static List<String> generateIds(String name) {
        return IntStream.rangeClosed(1, 2).mapToObj(id -> name + id).collect(Collectors.toList());
    }

    private void produceConsumeAndShutdown(ClusterInstance cluster, String topic, String group, int numConsumers, GroupProtocol groupProtocol) throws Exception {
        this.produceMessages(cluster, topic, 100);
        try (AutoCloseable consumerGroupCloseable = this.consumerGroupClosable(cluster, numConsumers, topic, group, groupProtocol);){
            this.awaitConsumerProgress(cluster, topic, group, 100L);
        }
    }

    private void writeContentToFile(File file, String content) throws IOException {
        try (BufferedWriter bw = new BufferedWriter(new FileWriter(file));){
            bw.write(content);
        }
    }

    private AutoCloseable consumerGroupClosable(ClusterInstance cluster, int numConsumers, String topic, String group, GroupProtocol groupProtocol) {
        Map<String, Object> configs = this.composeConsumerConfigs(cluster, group, groupProtocol);
        return ConsumerGroupCommandTestUtils.buildConsumers(numConsumers, false, topic, () -> new KafkaConsumer(configs));
    }

    private Map<String, Object> composeConsumerConfigs(ClusterInstance cluster, String group, GroupProtocol groupProtocol) {
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put("bootstrap.servers", cluster.bootstrapServers());
        configs.put("group.id", group);
        configs.put("group.protocol", groupProtocol.name);
        configs.put("key.deserializer", StringDeserializer.class.getName());
        configs.put("value.deserializer", StringDeserializer.class.getName());
        configs.put("auto.commit.interval.ms", 1000);
        configs.put("group.initial.rebalance.delay.ms", 1000);
        if (GroupProtocol.CLASSIC == groupProtocol) {
            configs.put("partition.assignment.strategy", RangeAssignor.class.getName());
        }
        return configs;
    }

    private void awaitConsumerProgress(ClusterInstance cluster, String topic, String group, long count) throws Exception {
        try (Admin admin = Admin.create(Collections.singletonMap("bootstrap.servers", cluster.bootstrapServers()));){
            Supplier<Long> offsets = () -> {
                try {
                    return ((Map)((Map)admin.listConsumerGroupOffsets(group).all().get()).get(group)).entrySet().stream().filter(e -> ((TopicPartition)e.getKey()).topic().equals(topic)).mapToLong(e -> ((OffsetAndMetadata)e.getValue()).offset()).sum();
                }
                catch (InterruptedException | ExecutionException e2) {
                    throw new RuntimeException(e2);
                }
            };
            TestUtils.waitForCondition(() -> (Long)offsets.get() == count, (String)("Expected that consumer group has consumed all messages from topic/partition. Expected offset: " + count + ". Actual offset: " + String.valueOf(offsets.get())));
        }
    }

    private void awaitConsumerGroupInactive(ConsumerGroupCommand.ConsumerGroupService service, String group) throws Exception {
        TestUtils.waitForCondition(() -> {
            GroupState state = service.collectGroupState((String)group).groupState;
            return Objects.equals(state, GroupState.EMPTY) || Objects.equals(state, GroupState.DEAD);
        }, (String)("Expected that consumer group is inactive. Actual state: " + String.valueOf(service.collectGroupState((String)group).groupState)));
    }

    private void resetAndAssertOffsetsCommitted(ClusterInstance cluster, ConsumerGroupCommand.ConsumerGroupService service, Map<TopicPartition, Long> expectedOffsets, String topic) {
        Map<String, Map<TopicPartition, OffsetAndMetadata>> allResetOffsets = this.resetOffsets(service);
        allResetOffsets.forEach((group, offsetsInfo) -> offsetsInfo.forEach((tp, offsetMetadata) -> {
            Assertions.assertEquals((long)offsetMetadata.offset(), (Long)((Long)expectedOffsets.get(tp)));
            Assertions.assertEquals((Object)expectedOffsets, this.committedOffsets(cluster, topic, (String)group));
        }));
    }

    private Map<TopicPartition, Long> toOffsetMap(Map<TopicPartition, OffsetAndMetadata> map) {
        return map.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> ((OffsetAndMetadata)e.getValue()).offset()));
    }

    private String[] addTo(String[] args, String ... extra) {
        ArrayList<String> res = new ArrayList<String>(Arrays.asList(args));
        res.addAll(Arrays.asList(extra));
        return res.toArray(new String[0]);
    }
}

