package org.apache.storm.kafka.migration;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.ZKPaths;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/migration/KafkaSpoutMigration.class */
public class KafkaSpoutMigration {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaSpoutMigration.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/kafka/migration/KafkaSpoutMigration$Configuration.class */
    public static class Configuration {
        private String zkHosts;
        private String zkRoot;
        private String spoutId;
        private String topic;
        private boolean isWildcardTopic;
        private String kafkaBootstrapServers;
        private String newSpoutConsumerGroup;
        private int zkSessionTimeoutMs;
        private int zkConnectionTimeoutMs;
        private int zkRetryTimes;
        private int zkRetryIntervalMs;

        private Configuration() {
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length != 1) {
            System.err.println("Args: confFile");
            System.exit(1);
        }
        Map findAndReadConfigFile = Utils.findAndReadConfigFile(strArr[0]);
        Configuration configuration = new Configuration();
        configuration.zkHosts = (String) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.servers");
        configuration.zkRoot = (String) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.root");
        configuration.spoutId = (String) MapUtil.getOrError(findAndReadConfigFile, "spout.id");
        configuration.topic = (String) MapUtil.getOrError(findAndReadConfigFile, "topic");
        configuration.isWildcardTopic = ((Boolean) MapUtil.getOrError(findAndReadConfigFile, "is.wildcard.topic")).booleanValue();
        configuration.kafkaBootstrapServers = (String) MapUtil.getOrError(findAndReadConfigFile, "kafka.bootstrap.servers");
        configuration.newSpoutConsumerGroup = (String) MapUtil.getOrError(findAndReadConfigFile, "new.spout.consumer.group");
        configuration.zkSessionTimeoutMs = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.session.timeout.ms")).intValue();
        configuration.zkConnectionTimeoutMs = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.connection.timeout.ms")).intValue();
        configuration.zkRetryTimes = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.retry.times")).intValue();
        configuration.zkRetryIntervalMs = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.retry.interval.ms")).intValue();
        Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = getOffsetsToCommit(configuration);
        LOG.info("Migrating offsets {}", offsetsToCommit);
        Properties properties = new Properties();
        properties.put("bootstrap.servers", configuration.kafkaBootstrapServers);
        properties.put("group.id", configuration.newSpoutConsumerGroup);
        properties.put("key.deserializer", ByteArrayDeserializer.class);
        properties.put("value.deserializer", ByteArrayDeserializer.class);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            try {
                kafkaConsumer.assign(offsetsToCommit.keySet());
                kafkaConsumer.commitSync(offsetsToCommit);
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                LOG.info("Migrated offsets {} to consumer group {}", offsetsToCommit, configuration.newSpoutConsumerGroup);
            } finally {
            }
        } catch (Throwable th3) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th3;
        }
    }

    private static Map<TopicPartition, OffsetAndMetadata> getOffsetsAtPath(CuratorFramework curatorFramework, ObjectMapper objectMapper, String str) throws Exception {
        HashMap hashMap = new HashMap();
        if (curatorFramework.checkExists().forPath(str) == null) {
            throw new RuntimeException("No such path " + str);
        }
        Iterator<String> it = curatorFramework.getChildren().forPath(str).iterator();
        while (it.hasNext()) {
            String str2 = str + ZKPaths.PATH_SEPARATOR + it.next();
            LOG.info("Reading offset data from path {}", str2);
            Map map = (Map) objectMapper.readValue(curatorFramework.getData().forPath(str2), new TypeReference<Map<String, Object>>() { // from class: org.apache.storm.kafka.migration.KafkaSpoutMigration.1
            });
            hashMap.put(new TopicPartition((String) map.get("topic"), ((Number) map.get("partition")).intValue()), new OffsetAndMetadata(((Number) map.get("offset")).longValue()));
        }
        return hashMap;
    }

    private static Map<TopicPartition, OffsetAndMetadata> getOffsetsToCommit(Configuration configuration) throws Exception {
        HashMap hashMap = new HashMap();
        CuratorFramework newCurator = newCurator(configuration);
        Throwable th = null;
        try {
            newCurator.start();
            ObjectMapper objectMapper = new ObjectMapper();
            String str = configuration.zkRoot + ZKPaths.PATH_SEPARATOR + configuration.spoutId;
            if (newCurator.checkExists().forPath(str) == null) {
                throw new RuntimeException("No such path " + str);
            }
            if (configuration.isWildcardTopic) {
                LOG.info("Expecting wildcard topics, looking for topics in {}", str);
                for (String str2 : newCurator.getChildren().forPath(str)) {
                    if (str2.matches(configuration.topic)) {
                        String str3 = str + ZKPaths.PATH_SEPARATOR + str2;
                        LOG.info("Looking for partitions in {}", str3);
                        hashMap.putAll(getOffsetsAtPath(newCurator, objectMapper, str3));
                    } else {
                        LOG.info("Skipping directory {} because it doesn't match the topic pattern {}", str2, configuration.topic);
                    }
                }
            } else {
                LOG.info("Expecting exact topic match, looking for offsets in {}", str);
                hashMap.putAll(getOffsetsAtPath(newCurator, objectMapper, str));
            }
            return hashMap;
        } finally {
            if (newCurator != null) {
                if (0 != 0) {
                    try {
                        newCurator.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    newCurator.close();
                }
            }
        }
    }

    private static CuratorFramework newCurator(Configuration configuration) throws Exception {
        return CuratorFrameworkFactory.newClient(configuration.zkHosts, configuration.zkSessionTimeoutMs, configuration.zkConnectionTimeoutMs, new RetryNTimes(configuration.zkRetryTimes, configuration.zkRetryIntervalMs));
    }
}
