/*
 * Decompiled with CFR 0.152.
 */
package kafka.tools;

import java.io.IOException;
import java.text.ParseException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import joptsimple.OptionException;
import joptsimple.OptionParser;
import joptsimple.OptionSet;
import joptsimple.OptionSpec;
import joptsimple.OptionSpecBuilder;
import kafka.utils.CommandLineUtils;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
import scala.collection.JavaConverters;
import scala.collection.Set;

@InterfaceStability.Unstable
public class StreamsResetter {
    private static final int EXIT_CODE_SUCCESS = 0;
    private static final int EXIT_CODE_ERROR = 1;
    private static OptionSpec<String> bootstrapServerOption;
    private static OptionSpec<String> applicationIdOption;
    private static OptionSpec<String> inputTopicsOption;
    private static OptionSpec<String> intermediateTopicsOption;
    private static OptionSpec<String> internalTopicsOption;
    private static OptionSpec<Long> toOffsetOption;
    private static OptionSpec<String> toDatetimeOption;
    private static OptionSpec<String> byDurationOption;
    private static OptionSpecBuilder toEarliestOption;
    private static OptionSpecBuilder toLatestOption;
    private static OptionSpec<String> fromFileOption;
    private static OptionSpec<Long> shiftByOption;
    private static OptionSpecBuilder dryRunOption;
    private static OptionSpec<Void> helpOption;
    private static OptionSpec<Void> versionOption;
    private static OptionSpec<String> commandConfigOption;
    private static OptionSpecBuilder forceOption;
    private static final String USAGE = "This tool helps to quickly reset an application in order to reprocess its data from scratch.\n* This tool resets offsets of input topics to the earliest available offset and it skips to the end of intermediate topics (topics that are input and output topics, e.g., used by deprecated through() method).\n* This tool deletes the internal topics that were created by Kafka Streams (topics starting with \"<application.id>-\").\nThe tool finds these internal topics automatically. If the topics flagged automatically for deletion by the dry-run are unsuitable, you can specify a subset with the \"--internal-topics\" option.\n* This tool will not delete output topics (if you want to delete them, you need to do it yourself with the bin/kafka-topics.sh command).\n* This tool will not clean up the local state on the stream application instances (the persisted stores used to cache aggregation results).\nYou need to call KafkaStreams#cleanUp() in your application or manually delete them from the directory specified by \"state.dir\" configuration (${java.io.tmpdir}/kafka-streams/<application.id> by default).\n* When long session timeout has been configured, active members could take longer to get expired on the broker thus blocking the reset job to complete. Use the \"--force\" option could remove those left-over members immediately. Make sure to stop all stream applications when this option is specified to avoid unexpected disruptions.\n\n*** Important! You will get wrong output if you don't clean up the local stores after running the reset tool!\n\n*** Warning! This tool makes irreversible changes to your application. It is strongly recommended that you run this once with \"--dry-run\" to preview your changes before making them.\n\n";
    private OptionSet options = null;
    private final List<String> allTopics = new LinkedList<String>();

    public int run(String[] args) {
        return this.run(args, new Properties());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] args, Properties config) {
        int exitCode;
        Admin adminClient = null;
        try {
            this.parseArguments(args);
            boolean dryRun = this.options.has((OptionSpec)dryRunOption);
            String groupId = (String)this.options.valueOf(applicationIdOption);
            Properties properties = new Properties();
            if (this.options.has(commandConfigOption)) {
                properties.putAll((Map<?, ?>)Utils.loadProps((String)((String)this.options.valueOf(commandConfigOption))));
            }
            properties.put("bootstrap.servers", this.options.valueOf(bootstrapServerOption));
            adminClient = Admin.create((Properties)properties);
            this.maybeDeleteActiveConsumers(groupId, adminClient);
            this.allTopics.clear();
            this.allTopics.addAll((Collection)adminClient.listTopics().names().get(60L, TimeUnit.SECONDS));
            if (dryRun) {
                System.out.println("----Dry run displays the actions which will be performed when running Streams Reset Tool----");
            }
            HashMap<Object, Object> consumerConfig = new HashMap<Object, Object>(config);
            consumerConfig.putAll(properties);
            exitCode = this.maybeResetInputAndSeekToEndIntermediateTopicOffsets(consumerConfig, dryRun);
            exitCode |= this.maybeDeleteInternalTopics(adminClient, dryRun);
        }
        catch (Throwable e) {
            exitCode = 1;
            System.err.println("ERROR: " + e);
            e.printStackTrace(System.err);
        }
        finally {
            if (adminClient != null) {
                adminClient.close(Duration.ofSeconds(60L));
            }
        }
        return exitCode;
    }

    private void maybeDeleteActiveConsumers(String groupId, Admin adminClient) throws ExecutionException, InterruptedException {
        DescribeConsumerGroupsResult describeResult = adminClient.describeConsumerGroups(Collections.singleton(groupId), (DescribeConsumerGroupsOptions)new DescribeConsumerGroupsOptions().timeoutMs(Integer.valueOf(10000)));
        ArrayList members = new ArrayList(((ConsumerGroupDescription)((KafkaFuture)describeResult.describedGroups().get(groupId)).get()).members());
        if (!members.isEmpty()) {
            if (this.options.has((OptionSpec)forceOption)) {
                System.out.println("Force deleting all active members in the group: " + groupId);
                adminClient.removeMembersFromConsumerGroup(groupId, new RemoveMembersFromConsumerGroupOptions()).all().get();
            } else {
                throw new IllegalStateException("Consumer group '" + groupId + "' is still active and has following members: " + members + ". Make sure to stop all running application instances before running the reset tool. You can use option '--force' to remove active members from the group.");
            }
        }
    }

    private void parseArguments(String[] args) {
        OptionParser optionParser = new OptionParser(false);
        applicationIdOption = optionParser.accepts("application-id", "The Kafka Streams application ID (application.id).").withRequiredArg().ofType(String.class).describedAs("id").required();
        bootstrapServerOption = optionParser.accepts("bootstrap-servers", "Comma-separated list of broker urls with format: HOST1:PORT1,HOST2:PORT2").withRequiredArg().ofType(String.class).defaultsTo((Object)"localhost:9092", (Object[])new String[0]).describedAs("urls");
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics. For these topics, the tool will reset the offset to the earliest available offset.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics that are input and output topics, e.g., used in the deprecated through() method). For these topics, the tool will skip to the end.").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        internalTopicsOption = optionParser.accepts("internal-topics", "Comma-separated list of internal topics to delete. Must be a subset of the internal topics marked for deletion by the default behaviour (do a dry-run without this option to view these topics).").withRequiredArg().ofType(String.class).withValuesSeparatedBy(',').describedAs("list");
        toOffsetOption = optionParser.accepts("to-offset", "Reset offsets to a specific offset.").withRequiredArg().ofType(Long.class);
        toDatetimeOption = optionParser.accepts("to-datetime", "Reset offsets to offset from datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'").withRequiredArg().ofType(String.class);
        byDurationOption = optionParser.accepts("by-duration", "Reset offsets to offset by duration from current timestamp. Format: 'PnDTnHnMnS'").withRequiredArg().ofType(String.class);
        toEarliestOption = optionParser.accepts("to-earliest", "Reset offsets to earliest offset.");
        toLatestOption = optionParser.accepts("to-latest", "Reset offsets to latest offset.");
        fromFileOption = optionParser.accepts("from-file", "Reset offsets to values defined in CSV file.").withRequiredArg().ofType(String.class);
        shiftByOption = optionParser.accepts("shift-by", "Reset offsets shifting current offset by 'n', where 'n' can be positive or negative").withRequiredArg().describedAs("number-of-offsets").ofType(Long.class);
        commandConfigOption = optionParser.accepts("config-file", "Property file containing configs to be passed to admin clients and embedded consumer.").withRequiredArg().ofType(String.class).describedAs("file name");
        forceOption = optionParser.accepts("force", "Force the removal of members of the consumer group (intended to remove stopped members if a long session timeout was used). Make sure to shut down all stream applications when this option is specified to avoid unexpected rebalances.");
        dryRunOption = optionParser.accepts("dry-run", "Display the actions that would be performed without executing the reset commands.");
        helpOption = optionParser.accepts("help", "Print usage information.").forHelp();
        versionOption = optionParser.accepts("version", "Print version information and exit.").forHelp();
        try {
            this.options = optionParser.parse(args);
            if (args.length == 0 || this.options.has(helpOption)) {
                CommandLineUtils.printUsageAndDie(optionParser, USAGE);
            }
            if (this.options.has(versionOption)) {
                CommandLineUtils.printVersionAndDie();
            }
        }
        catch (OptionException e) {
            CommandLineUtils.printUsageAndDie(optionParser, e.getMessage());
        }
        HashSet allScenarioOptions = new HashSet();
        allScenarioOptions.add(toOffsetOption);
        allScenarioOptions.add(toDatetimeOption);
        allScenarioOptions.add(byDurationOption);
        allScenarioOptions.add((OptionSpec<?>)toEarliestOption);
        allScenarioOptions.add((OptionSpec<?>)toLatestOption);
        allScenarioOptions.add(fromFileOption);
        allScenarioOptions.add(shiftByOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, toOffsetOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, toDatetimeOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, byDurationOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, (OptionSpec)toEarliestOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, (OptionSpec)toLatestOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, fromFileOption);
        this.checkInvalidArgs(optionParser, this.options, allScenarioOptions, shiftByOption);
    }

    private <T> void checkInvalidArgs(OptionParser optionParser, OptionSet options, java.util.Set<OptionSpec<?>> allOptions, OptionSpec<T> option) {
        HashSet invalidOptions = new HashSet(allOptions);
        invalidOptions.remove(option);
        CommandLineUtils.checkInvalidArgs(optionParser, options, option, (Set)JavaConverters.asScalaSetConverter(invalidOptions).asScala());
    }

    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(Map<Object, Object> consumerConfig, boolean dryRun) throws IOException, ParseException {
        List inputTopics = this.options.valuesOf(inputTopicsOption);
        List intermediateTopics = this.options.valuesOf(intermediateTopicsOption);
        int topicNotFound = 0;
        ArrayList<String> notFoundInputTopics = new ArrayList<String>();
        ArrayList<String> notFoundIntermediateTopics = new ArrayList<String>();
        String groupId = (String)this.options.valueOf(applicationIdOption);
        if (inputTopics.size() == 0 && intermediateTopics.size() == 0) {
            System.out.println("No input or intermediate topics specified. Skipping seek.");
            return 0;
        }
        if (inputTopics.size() != 0) {
            System.out.println("Reset-offsets for input topics " + inputTopics);
        }
        if (intermediateTopics.size() != 0) {
            System.out.println("Seek-to-end for intermediate topics " + intermediateTopics);
        }
        HashSet<String> topicsToSubscribe = new HashSet<String>(inputTopics.size() + intermediateTopics.size());
        for (String topic : inputTopics) {
            if (!this.allTopics.contains(topic)) {
                notFoundInputTopics.add(topic);
                continue;
            }
            topicsToSubscribe.add(topic);
        }
        for (String topic : intermediateTopics) {
            if (!this.allTopics.contains(topic)) {
                notFoundIntermediateTopics.add(topic);
                continue;
            }
            topicsToSubscribe.add(topic);
        }
        if (!notFoundInputTopics.isEmpty()) {
            System.out.println("Following input topics are not found, skipping them");
            for (String topic : notFoundInputTopics) {
                System.out.println("Topic: " + topic);
            }
            topicNotFound = 1;
        }
        if (!notFoundIntermediateTopics.isEmpty()) {
            System.out.println("Following intermediate topics are not found, skipping them");
            for (String topic : notFoundIntermediateTopics) {
                System.out.println("Topic:" + topic);
            }
            topicNotFound = 1;
        }
        if (topicsToSubscribe.isEmpty()) {
            return topicNotFound;
        }
        Properties config = new Properties();
        config.putAll(consumerConfig);
        config.setProperty("group.id", groupId);
        config.setProperty("enable.auto.commit", "false");
        try (KafkaConsumer client = new KafkaConsumer(config, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());){
            Collection partitions = topicsToSubscribe.stream().map(arg_0 -> ((KafkaConsumer)client).partitionsFor(arg_0)).flatMap(Collection::stream).map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList());
            client.assign(partitions);
            HashSet<TopicPartition> inputTopicPartitions = new HashSet<TopicPartition>();
            HashSet<TopicPartition> intermediateTopicPartitions = new HashSet<TopicPartition>();
            for (TopicPartition p : partitions) {
                String topic = p.topic();
                if (this.isInputTopic(topic)) {
                    inputTopicPartitions.add(p);
                    continue;
                }
                if (this.isIntermediateTopic(topic)) {
                    intermediateTopicPartitions.add(p);
                    continue;
                }
                System.err.println("Skipping invalid partition: " + p);
            }
            this.maybeReset(groupId, (Consumer<byte[], byte[]>)client, inputTopicPartitions);
            this.maybeSeekToEnd(groupId, (Consumer<byte[], byte[]>)client, intermediateTopicPartitions);
            if (!dryRun) {
                for (TopicPartition p : partitions) {
                    client.position(p);
                }
                client.commitSync();
            }
        }
        catch (IOException | ParseException e) {
            System.err.println("ERROR: Resetting offsets failed.");
            throw e;
        }
        System.out.println("Done.");
        return topicNotFound;
    }

    public void maybeSeekToEnd(String groupId, Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> intermediateTopicPartitions) {
        if (intermediateTopicPartitions.size() > 0) {
            System.out.println("Following intermediate topics offsets will be reset to end (for consumer group " + groupId + ")");
            for (TopicPartition topicPartition : intermediateTopicPartitions) {
                if (!this.allTopics.contains(topicPartition.topic())) continue;
                System.out.println("Topic: " + topicPartition.topic());
            }
            client.seekToEnd(intermediateTopicPartitions);
        }
    }

    private void maybeReset(String groupId, Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions) throws IOException, ParseException {
        if (inputTopicPartitions.size() > 0) {
            System.out.println("Following input topics offsets will be reset to (for consumer group " + groupId + ")");
            if (this.options.has(toOffsetOption)) {
                this.resetOffsetsTo(client, inputTopicPartitions, (Long)this.options.valueOf(toOffsetOption));
            } else if (this.options.has((OptionSpec)toEarliestOption)) {
                client.seekToBeginning(inputTopicPartitions);
            } else if (this.options.has((OptionSpec)toLatestOption)) {
                client.seekToEnd(inputTopicPartitions);
            } else if (this.options.has(shiftByOption)) {
                this.shiftOffsetsBy(client, inputTopicPartitions, (Long)this.options.valueOf(shiftByOption));
            } else if (this.options.has(toDatetimeOption)) {
                String ts = (String)this.options.valueOf(toDatetimeOption);
                long timestamp = Utils.getDateTime((String)ts);
                this.resetToDatetime(client, inputTopicPartitions, timestamp);
            } else if (this.options.has(byDurationOption)) {
                String duration = (String)this.options.valueOf(byDurationOption);
                this.resetByDuration(client, inputTopicPartitions, Duration.parse(duration));
            } else if (this.options.has(fromFileOption)) {
                String resetPlanPath = (String)this.options.valueOf(fromFileOption);
                Map<TopicPartition, Long> topicPartitionsAndOffset = this.getTopicPartitionOffsetFromResetPlan(resetPlanPath);
                this.resetOffsetsFromResetPlan(client, inputTopicPartitions, topicPartitionsAndOffset);
            } else {
                client.seekToBeginning(inputTopicPartitions);
            }
            for (TopicPartition p : inputTopicPartitions) {
                System.out.println("Topic: " + p.topic() + " Partition: " + p.partition() + " Offset: " + client.position(p));
            }
        }
    }

    public void resetOffsetsFromResetPlan(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Map<TopicPartition, Long> topicPartitionsAndOffset) {
        Map endOffsets = client.endOffsets(inputTopicPartitions);
        Map beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition).longValue());
        }
    }

    private Map<TopicPartition, Long> getTopicPartitionOffsetFromResetPlan(String resetPlanPath) throws IOException, ParseException {
        String resetPlanCsv = Utils.readFileAsString((String)resetPlanPath);
        return this.parseResetPlan(resetPlanCsv);
    }

    private void resetByDuration(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Duration duration) {
        this.resetToDatetime(client, inputTopicPartitions, Instant.now().minus(duration).toEpochMilli());
    }

    public void resetToDatetime(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Long timestamp) {
        HashMap<TopicPartition, Long> topicPartitionsAndTimes = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            topicPartitionsAndTimes.put(topicPartition, timestamp);
        }
        Map topicPartitionsAndOffset = client.offsetsForTimes(topicPartitionsAndTimes);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            Optional<Long> partitionOffset = Optional.ofNullable(topicPartitionsAndOffset.get(topicPartition)).map(OffsetAndTimestamp::offset).filter(offset -> offset != -1L);
            if (partitionOffset.isPresent()) {
                client.seek(topicPartition, partitionOffset.get().longValue());
                continue;
            }
            client.seekToEnd(Collections.singletonList(topicPartition));
            System.out.println("Partition " + topicPartition.partition() + " from topic " + topicPartition.topic() + " is empty, without a committed record. Falling back to latest known offset.");
        }
    }

    public void shiftOffsetsBy(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, long shiftBy) {
        Map endOffsets = client.endOffsets(inputTopicPartitions);
        Map beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        HashMap<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            long position = client.position(topicPartition);
            long offset = position + shiftBy;
            topicPartitionsAndOffset.put(topicPartition, offset);
        }
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition).longValue());
        }
    }

    public void resetOffsetsTo(Consumer<byte[], byte[]> client, java.util.Set<TopicPartition> inputTopicPartitions, Long offset) {
        Map endOffsets = client.endOffsets(inputTopicPartitions);
        Map beginningOffsets = client.beginningOffsets(inputTopicPartitions);
        HashMap<TopicPartition, Long> topicPartitionsAndOffset = new HashMap<TopicPartition, Long>(inputTopicPartitions.size());
        for (TopicPartition topicPartition : inputTopicPartitions) {
            topicPartitionsAndOffset.put(topicPartition, offset);
        }
        Map<TopicPartition, Long> validatedTopicPartitionsAndOffset = this.checkOffsetRange(topicPartitionsAndOffset, beginningOffsets, endOffsets);
        for (TopicPartition topicPartition : inputTopicPartitions) {
            client.seek(topicPartition, validatedTopicPartitionsAndOffset.get(topicPartition).longValue());
        }
    }

    private Map<TopicPartition, Long> parseResetPlan(String resetPlanCsv) throws ParseException {
        String[] resetPlanCsvParts;
        HashMap<TopicPartition, Long> topicPartitionAndOffset = new HashMap<TopicPartition, Long>();
        if (resetPlanCsv == null || resetPlanCsv.isEmpty()) {
            throw new ParseException("Error parsing reset plan CSV file. It is empty,", 0);
        }
        for (String line : resetPlanCsvParts = resetPlanCsv.split("\n")) {
            String[] lineParts = line.split(",");
            if (lineParts.length != 3) {
                throw new ParseException("Reset plan CSV file is not following the format `TOPIC,PARTITION,OFFSET`.", 0);
            }
            String topic = lineParts[0];
            int partition = Integer.parseInt(lineParts[1]);
            long offset = Long.parseLong(lineParts[2]);
            TopicPartition topicPartition = new TopicPartition(topic, partition);
            topicPartitionAndOffset.put(topicPartition, offset);
        }
        return topicPartitionAndOffset;
    }

    private Map<TopicPartition, Long> checkOffsetRange(Map<TopicPartition, Long> inputTopicPartitionsAndOffset, Map<TopicPartition, Long> beginningOffsets, Map<TopicPartition, Long> endOffsets) {
        HashMap<TopicPartition, Long> validatedTopicPartitionsOffsets = new HashMap<TopicPartition, Long>();
        for (Map.Entry<TopicPartition, Long> topicPartitionAndOffset : inputTopicPartitionsAndOffset.entrySet()) {
            long endOffset = endOffsets.get(topicPartitionAndOffset.getKey());
            long offset = topicPartitionAndOffset.getValue();
            if (offset < endOffset) {
                long beginningOffset = beginningOffsets.get(topicPartitionAndOffset.getKey());
                if (offset > beginningOffset) {
                    validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), offset);
                    continue;
                }
                System.out.println("New offset (" + offset + ") is lower than earliest offset. Value will be set to " + beginningOffset);
                validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), beginningOffset);
                continue;
            }
            System.out.println("New offset (" + offset + ") is higher than latest offset. Value will be set to " + endOffset);
            validatedTopicPartitionsOffsets.put(topicPartitionAndOffset.getKey(), endOffset);
        }
        return validatedTopicPartitionsOffsets;
    }

    private boolean isInputTopic(String topic) {
        return this.options.valuesOf(inputTopicsOption).contains(topic);
    }

    private boolean isIntermediateTopic(String topic) {
        return this.options.valuesOf(intermediateTopicsOption).contains(topic);
    }

    private int maybeDeleteInternalTopics(Admin adminClient, boolean dryRun) {
        List topicsToDelete;
        List inferredInternalTopics = this.allTopics.stream().filter(this::isInferredInternalTopic).collect(Collectors.toList());
        List specifiedInternalTopics = this.options.valuesOf(internalTopicsOption);
        if (!specifiedInternalTopics.isEmpty()) {
            if (!inferredInternalTopics.containsAll(specifiedInternalTopics)) {
                throw new IllegalArgumentException("Invalid topic specified in the --internal-topics option. Ensure that the topics specified are all internal topics. Do a dry run without the --internal-topics option to see the list of all internal topics that can be deleted.");
            }
            topicsToDelete = specifiedInternalTopics;
            System.out.println("Deleting specified internal topics " + topicsToDelete);
        } else {
            topicsToDelete = inferredInternalTopics;
            System.out.println("Deleting inferred internal topics " + topicsToDelete);
        }
        if (!dryRun) {
            this.doDelete(topicsToDelete, adminClient);
        }
        System.out.println("Done.");
        return 0;
    }

    public void doDelete(List<String> topicsToDelete, Admin adminClient) {
        boolean hasDeleteErrors = false;
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(topicsToDelete);
        Map results = deleteTopicsResult.values();
        for (Map.Entry entry : results.entrySet()) {
            try {
                ((KafkaFuture)entry.getValue()).get(30L, TimeUnit.SECONDS);
            }
            catch (Exception e) {
                System.err.println("ERROR: deleting topic " + (String)entry.getKey());
                e.printStackTrace(System.err);
                hasDeleteErrors = true;
            }
        }
        if (hasDeleteErrors) {
            throw new RuntimeException("Encountered an error deleting one or more topics");
        }
    }

    private boolean isInferredInternalTopic(String topicName) {
        return !this.isInputTopic(topicName) && !this.isIntermediateTopic(topicName) && topicName.startsWith((String)this.options.valueOf(applicationIdOption) + "-") && StreamsResetter.matchesInternalTopicFormat(topicName);
    }

    public static boolean matchesInternalTopicFormat(String topicName) {
        return topicName.endsWith("-changelog") || topicName.endsWith("-repartition") || topicName.endsWith("-subscription-registration-topic") || topicName.endsWith("-subscription-response-topic") || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-\\d+-topic") || topicName.matches(".+-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-\\d+-topic");
    }

    public static void main(String[] args) {
        Exit.exit((int)new StreamsResetter().run(args));
    }
}

