package org.apache.storm.kafka.migration;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.builder.ToStringBuilder;
import org.apache.commons.lang.builder.ToStringStyle;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.PathAndBytesable;
import org.apache.curator.retry.RetryNTimes;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/kafka/migration/KafkaTridentSpoutMigration.class */
public class KafkaTridentSpoutMigration {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) KafkaSpoutMigration.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/kafka/migration/KafkaTridentSpoutMigration$Configuration.class */
    public static class Configuration {
        private String zkHosts;
        private String zkRoot;
        private String txId;
        private String topic;
        private boolean isWildcardTopic;
        private String newTopologyTxId;
        private int zkSessionTimeoutMs;
        private int zkConnectionTimeoutMs;
        private int zkRetryTimes;
        private int zkRetryIntervalMs;

        private Configuration() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/storm/kafka/migration/KafkaTridentSpoutMigration$PartitionMetadata.class */
    public static class PartitionMetadata {
        private final long firstOffset;
        private final long lastOffset;

        PartitionMetadata(long j, long j2) {
            this.firstOffset = j;
            this.lastOffset = j2;
        }

        public String toString() {
            return ToStringBuilder.reflectionToString(this, ToStringStyle.SIMPLE_STYLE);
        }
    }

    public static void main(String[] strArr) throws Exception {
        if (strArr.length != 1) {
            System.err.println("Args: confFile");
            System.exit(1);
        }
        Map findAndReadConfigFile = Utils.findAndReadConfigFile(strArr[0]);
        Configuration configuration = new Configuration();
        configuration.zkHosts = (String) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.servers");
        configuration.zkRoot = (String) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.root");
        configuration.txId = (String) MapUtil.getOrError(findAndReadConfigFile, "txid");
        configuration.topic = (String) MapUtil.getOrError(findAndReadConfigFile, "topic");
        configuration.isWildcardTopic = ((Boolean) MapUtil.getOrError(findAndReadConfigFile, "is.wildcard.topic")).booleanValue();
        configuration.newTopologyTxId = (String) MapUtil.getOrError(findAndReadConfigFile, "new.topology.txid");
        configuration.zkSessionTimeoutMs = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.session.timeout.ms")).intValue();
        configuration.zkConnectionTimeoutMs = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.connection.timeout.ms")).intValue();
        configuration.zkRetryTimes = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.retry.times")).intValue();
        configuration.zkRetryIntervalMs = ((Integer) MapUtil.getOrError(findAndReadConfigFile, "zookeeper.retry.interval.ms")).intValue();
        CuratorFramework newCurator = newCurator(configuration);
        try {
            newCurator.start();
            Map<TopicPartition, Map<Long, PartitionMetadata>> offsetsToMigrate = getOffsetsToMigrate(newCurator, configuration);
            LOG.info("Migrating offsets {}", offsetsToMigrate);
            migrateOffsets(newCurator, configuration, offsetsToMigrate);
            migrateCoordinator(newCurator, configuration, new ArrayList(offsetsToMigrate.keySet()));
            if (newCurator != null) {
                newCurator.close();
            }
        } catch (Throwable th) {
            if (newCurator != null) {
                try {
                    newCurator.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static Map<TopicPartition, Map<Long, PartitionMetadata>> getOffsetsAtPath(CuratorFramework curatorFramework, ObjectMapper objectMapper2, String str) throws Exception {
        HashMap hashMap = new HashMap();
        if (curatorFramework.checkExists().forPath(str) == null) {
            throw new RuntimeException("No such path " + str);
        }
        Iterator<String> it = curatorFramework.getChildren().forPath(str).iterator();
        while (it.hasNext()) {
            String str2 = str + "/" + it.next();
            List<String> forPath = curatorFramework.getChildren().forPath(str2);
            HashMap hashMap2 = new HashMap();
            TopicPartition topicPartition = null;
            for (String str3 : forPath) {
                String str4 = str2 + "/" + str3;
                LOG.info("Reading offset data from path {}", str4);
                Map map = (Map) objectMapper2.readValue(curatorFramework.getData().forPath(str4), new TypeReference<Map<String, Object>>() { // from class: org.apache.storm.kafka.migration.KafkaTridentSpoutMigration.1
                });
                topicPartition = new TopicPartition((String) map.get("topic"), ((Number) map.get("partition")).intValue());
                hashMap2.put(Long.valueOf(Long.parseLong(str3)), new PartitionMetadata(((Number) map.get("offset")).longValue(), ((Number) map.get("nextOffset")).longValue() - 1));
            }
            if (topicPartition != null) {
                hashMap.put(topicPartition, hashMap2);
            }
        }
        return hashMap;
    }

    private static Map<TopicPartition, Map<Long, PartitionMetadata>> getOffsetsToMigrate(CuratorFramework curatorFramework, Configuration configuration) throws Exception {
        HashMap hashMap = new HashMap();
        String str = configuration.zkRoot + "/" + configuration.txId + "/user";
        if (curatorFramework.checkExists().forPath(str) == null) {
            throw new RuntimeException("No such path " + str);
        }
        if (configuration.isWildcardTopic) {
            LOG.info("Expecting wildcard topics, looking for topics in {}", str);
            for (String str2 : curatorFramework.getChildren().forPath(str)) {
                if (str2.matches(configuration.topic)) {
                    String str3 = str + "/" + str2;
                    LOG.info("Looking for partitions in {}", str3);
                    hashMap.putAll(getOffsetsAtPath(curatorFramework, objectMapper, str3));
                } else {
                    LOG.info("Skipping directory {} because it does not match topic pattern {}", str2, configuration.topic);
                }
            }
        } else {
            LOG.info("Expecting exact topic match, looking for offsets in {}", str);
            hashMap.putAll(getOffsetsAtPath(curatorFramework, objectMapper, str));
        }
        return hashMap;
    }

    private static String coordinatorPath(Configuration configuration, String str) {
        return configuration.zkRoot + "/" + str + "/coordinator";
    }

    private static void migrateCoordinator(CuratorFramework curatorFramework, Configuration configuration, List<TopicPartition> list) throws Exception {
        String coordinatorPath = coordinatorPath(configuration, configuration.txId);
        String coordinatorPath2 = coordinatorPath(configuration, configuration.newTopologyTxId);
        String str = coordinatorPath + "/currtx";
        String str2 = coordinatorPath2 + "/currtx";
        createOrUpdate(curatorFramework, str2).forPath(str2, curatorFramework.getData().forPath(str));
        String str3 = coordinatorPath + "/currattempts";
        String str4 = coordinatorPath2 + "/currattempts";
        createOrUpdate(curatorFramework, str4).forPath(str4, curatorFramework.getData().forPath(str3));
        List<String> forPath = curatorFramework.getChildren().forPath(coordinatorPath + "/meta");
        ArrayList arrayList = new ArrayList();
        Iterator<TopicPartition> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(tpMeta(it.next()));
        }
        Iterator<String> it2 = forPath.iterator();
        while (it2.hasNext()) {
            String str5 = coordinatorPath2 + "/meta/" + it2.next();
            createOrUpdate(curatorFramework, str5).forPath(str5, objectMapper.writeValueAsBytes(arrayList));
        }
        LOG.info("Migrated coordinator data to new path {}", coordinatorPath2);
    }

    private static PathAndBytesable<?> createOrUpdate(CuratorFramework curatorFramework, String str) throws Exception {
        return curatorFramework.checkExists().forPath(str) == null ? curatorFramework.create().creatingParentsIfNeeded() : curatorFramework.setData();
    }

    private static Map<String, Object> tpMeta(TopicPartition topicPartition) {
        HashMap hashMap = new HashMap();
        hashMap.put("topic", topicPartition.topic());
        hashMap.put("partition", Integer.valueOf(topicPartition.partition()));
        return hashMap;
    }

    private static void migrateOffsets(CuratorFramework curatorFramework, Configuration configuration, Map<TopicPartition, Map<Long, PartitionMetadata>> map) throws Exception {
        String str = configuration.zkRoot + "/" + configuration.newTopologyTxId + "/user";
        for (Map.Entry<TopicPartition, Map<Long, PartitionMetadata>> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            for (Map.Entry<Long, PartitionMetadata> entry2 : entry.getValue().entrySet()) {
                PartitionMetadata value = entry2.getValue();
                HashMap hashMap = new HashMap();
                hashMap.put("firstOffset", Long.valueOf(value.firstOffset));
                hashMap.put("lastOffset", Long.valueOf(value.lastOffset));
                hashMap.put("tp", tpMeta(key));
                String str2 = str + "/" + key.topic() + "@" + key.partition() + "/" + entry2.getKey();
                LOG.info("Writing {} to path {}", hashMap, str2);
                createOrUpdate(curatorFramework, str2).forPath(str2, objectMapper.writeValueAsBytes(hashMap));
            }
        }
        LOG.info("Migrated offsets {} to new root {}", map, str);
    }

    private static CuratorFramework newCurator(Configuration configuration) throws Exception {
        return CuratorFrameworkFactory.newClient(configuration.zkHosts, configuration.zkSessionTimeoutMs, configuration.zkConnectionTimeoutMs, new RetryNTimes(configuration.zkRetryTimes, configuration.zkRetryIntervalMs));
    }
}
