package com.pinterest.doctorkafka;

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.config.DoctorKafkaConfig;
import com.pinterest.doctorkafka.notification.Email;
import com.pinterest.doctorkafka.replicastats.ReplicaStatsManager;
import com.pinterest.doctorkafka.util.BrokerReplacer;
import com.pinterest.doctorkafka.util.KafkaUtils;
import com.pinterest.doctorkafka.util.OpenTsdbMetricConverter;
import com.pinterest.doctorkafka.util.OperatorUtil;
import com.pinterest.doctorkafka.util.OutOfSyncReplica;
import com.pinterest.doctorkafka.util.PreferredReplicaElectionInfo;
import com.pinterest.doctorkafka.util.ReassignmentInfo;
import com.pinterest.doctorkafka.util.UnderReplicatedReason;
import com.pinterest.doctorkafka.util.ZookeeperClient;
import java.lang.management.ManagementFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import kafka.cluster.Broker;
import kafka.common.TopicAndPartition;
import kafka.utils.ZkUtils;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import scala.Tuple2;
import scala.collection.JavaConverters;
import scala.collection.Seq;
import scala.collection.mutable.HashSet;

/* loaded from: input_file:com/pinterest/doctorkafka/KafkaClusterManager.class */
public class KafkaClusterManager implements Runnable {
    private static final Logger LOG;
    private static final Gson gson;
    private static final long MAX_HOST_REBOOT_TIME_MS = 300000;
    private static final long MAX_TIMEOUT_MS = 300000;
    private static final long MAX_HOST_REPLACEMENT_TIME_SECONDS = 1800;
    private static final int NUM_BROKER_STATS = 4;
    private String zkUrl;
    private SecurityProtocol securityProtocol;
    private Map<String, String> consumerConfigs;
    private ZkUtils zkUtils;
    private KafkaCluster kafkaCluster;
    private DoctorKafkaConfig drkafkaConfig;
    private DoctorKafkaClusterConfig clusterConfig;
    private DoctorKafkaActionReporter actionReporter;
    private double bytesInLimit;
    private double bytesOutLimit;
    private BrokerReplacer brokerReplacer;
    private ZookeeperClient zookeeperClient;
    static final /* synthetic */ boolean $assertionsDisabled;
    private boolean stopped = true;
    private Thread thread = null;
    private List<PartitionInfo> underReplicatedPartitions = new ArrayList();
    private Map<String, scala.collection.Map<Object, Seq<Object>>> topicPartitionAssignments = new HashMap();
    private List<MutablePair<KafkaBroker, TopicPartition>> reassignmentFailures = new ArrayList();
    private Map<TopicPartition, ReassignmentInfo> reassignmentMap = new HashMap();
    private Map<TopicPartition, PreferredReplicaElectionInfo> preferredLeaders = new HashMap();
    private AtomicBoolean maintenanceMode = new AtomicBoolean(false);

    public KafkaClusterManager(String str, KafkaCluster kafkaCluster, DoctorKafkaClusterConfig doctorKafkaClusterConfig, DoctorKafkaConfig doctorKafkaConfig, DoctorKafkaActionReporter doctorKafkaActionReporter, ZookeeperClient zookeeperClient, ReplicaStatsManager replicaStatsManager) {
        this.kafkaCluster = null;
        this.drkafkaConfig = null;
        this.actionReporter = null;
        if (!$assertionsDisabled && doctorKafkaClusterConfig == null) {
            throw new AssertionError();
        }
        this.zkUrl = str;
        this.zkUtils = KafkaUtils.getZkUtils(str);
        this.securityProtocol = doctorKafkaClusterConfig.getSecurityProtocol();
        this.consumerConfigs = doctorKafkaClusterConfig.getConsumerConfigurations();
        this.kafkaCluster = kafkaCluster;
        this.clusterConfig = doctorKafkaClusterConfig;
        this.drkafkaConfig = doctorKafkaConfig;
        this.actionReporter = doctorKafkaActionReporter;
        this.bytesInLimit = doctorKafkaClusterConfig.getNetworkInLimitInBytes();
        this.bytesOutLimit = doctorKafkaClusterConfig.getNetworkOutLimitInBytes();
        this.zookeeperClient = zookeeperClient;
        if (doctorKafkaClusterConfig.enabledDeadbrokerReplacement()) {
            this.brokerReplacer = new BrokerReplacer(doctorKafkaConfig.getBrokerReplacementCommand());
        }
    }

    public KafkaCluster getCluster() {
        return this.kafkaCluster;
    }

    public void start() {
        this.thread = new Thread(this);
        this.thread.setName("ClusterManager:" + getClusterName());
        this.thread.start();
    }

    public void stop() {
        this.stopped = true;
    }

    public JsonElement toJson() {
        JsonObject jsonObject = new JsonObject();
        KafkaConsumer kafkaConsumer = KafkaUtils.getKafkaConsumer(this.zkUrl, this.securityProtocol, this.consumerConfigs);
        jsonObject.addProperty("zkUrl", this.zkUrl);
        jsonObject.add("bytesInLimit", gson.toJsonTree(Double.valueOf(this.bytesInLimit)));
        jsonObject.add("bytesOutLimit", gson.toJsonTree(Double.valueOf(this.bytesOutLimit)));
        jsonObject.add("underReplicatedPartitions", gson.toJsonTree(this.underReplicatedPartitions));
        jsonObject.add("topicPartitionAssignments", gson.toJsonTree(this.topicPartitionAssignments));
        jsonObject.add("kafkaCluster", gson.toJsonTree(this.kafkaCluster.toJson()));
        jsonObject.add("topics", gson.toJsonTree(kafkaConsumer.listTopics()));
        return jsonObject;
    }

    public String getClusterName() {
        return this.clusterConfig.getClusterName();
    }

    public String getZkUrl() {
        return this.zkUrl;
    }

    public Map<String, scala.collection.Map<Object, Seq<Object>>> getTopicPartitionAssignments() {
        return this.topicPartitionAssignments;
    }

    public int getClusterSize() {
        if (this.kafkaCluster == null) {
            LOG.error("kafkaCluster is null for {}", this.zkUrl);
        }
        return this.kafkaCluster.size();
    }

    public List<PartitionInfo> getUnderReplicatedPartitions() {
        return this.underReplicatedPartitions;
    }

    private scala.collection.Map<Object, Seq<Object>> getReplicaAssignmentForTopic(ZkUtils zkUtils, String str) {
        if (this.topicPartitionAssignments.containsKey(str)) {
            return this.topicPartitionAssignments.get(str);
        }
        ArrayList arrayList = new ArrayList();
        arrayList.add(str);
        scala.collection.Map<Object, Seq<Object>> map = (scala.collection.Map) zkUtils.getPartitionAssignmentForTopics(JavaConverters.asScalaBuffer(arrayList).toSeq()).get(str).get();
        this.topicPartitionAssignments.put(str, map);
        return map;
    }

    private List<Integer> getReplicaAssignment(TopicPartition topicPartition) {
        return (List) JavaConverters.seqAsJavaList((Seq) getReplicaAssignmentForTopic(this.zkUtils, topicPartition.topic()).get(Integer.valueOf(topicPartition.partition())).get()).stream().map(obj -> {
            return (Integer) obj;
        }).collect(Collectors.toList());
    }

    /* JADX WARN: Removed duplicated region for block: B:23:0x0163 A[Catch: Exception -> 0x0236, TryCatch #0 {Exception -> 0x0236, blocks: (B:8:0x002b, B:9:0x0057, B:11:0x0061, B:37:0x00d1, B:42:0x010f, B:21:0x014f, B:23:0x0163, B:25:0x01f9, B:35:0x020c, B:14:0x0142), top: B:7:0x002b }] */
    /* JADX WARN: Removed duplicated region for block: B:35:0x020c A[Catch: Exception -> 0x0236, TryCatch #0 {Exception -> 0x0236, blocks: (B:8:0x002b, B:9:0x0057, B:11:0x0061, B:37:0x00d1, B:42:0x010f, B:21:0x014f, B:23:0x0163, B:25:0x01f9, B:35:0x020c, B:14:0x0142), top: B:7:0x002b }] */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private void generateLeadersReassignmentPlan(com.pinterest.doctorkafka.KafkaBroker r8, java.util.List<org.apache.kafka.common.TopicPartition> r9, double r10, double r12) {
        /*
            Method dump skipped, instructions count: 599
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.pinterest.doctorkafka.KafkaClusterManager.generateLeadersReassignmentPlan(com.pinterest.doctorkafka.KafkaBroker, java.util.List, double, double):void");
    }

    private void generateFollowerReassignmentPlan(KafkaBroker kafkaBroker) {
        LOG.info("Begin generating follower reassignment plan for {}", kafkaBroker.getName());
        Map<TopicPartition, Double> sortTopicPartitionsByTraffic = sortTopicPartitionsByTraffic(kafkaBroker.getFollowerTopicPartitions());
        sortTopicPartitionsByTraffic.keySet().stream().forEach(topicPartition -> {
            LOG.info("     traffic :{} : {}", topicPartition, sortTopicPartitionsByTraffic.get(topicPartition));
        });
        try {
            double maxBytesIn = kafkaBroker.getMaxBytesIn() + kafkaBroker.getReservedBytesIn();
            double d = 0.0d;
            Iterator<Map.Entry<TopicPartition, Double>> it = sortTopicPartitionsByTraffic.entrySet().iterator();
            while (it.hasNext()) {
                TopicPartition key = it.next().getKey();
                double maxBytesIn2 = this.kafkaCluster.getMaxBytesIn(key);
                if ((maxBytesIn - d) - maxBytesIn2 >= this.bytesInLimit) {
                    KafkaBroker alternativeBroker = this.kafkaCluster.getAlternativeBroker(key, maxBytesIn2, 0.0d);
                    if (alternativeBroker != null) {
                        LOG.info("  Alternative broker for {} : {} -> {}, bytesIn: {}", key, kafkaBroker.getName(), alternativeBroker.getName(), Double.valueOf(maxBytesIn2));
                        ReassignmentInfo reassignmentInfo = new ReassignmentInfo(key, kafkaBroker, alternativeBroker);
                        this.reassignmentMap.put(key, reassignmentInfo);
                        LOG.info("    {} : {} -> {}", key, reassignmentInfo.source.getName(), reassignmentInfo.dest.getName());
                        d += maxBytesIn2;
                        if (kafkaBroker.getMaxBytesIn() - d <= this.bytesInLimit) {
                            break;
                        }
                    } else {
                        LOG.info("Could not find an alternative broker for {}:{}", kafkaBroker.getName(), key);
                        this.reassignmentFailures.add(new MutablePair<>(kafkaBroker, key));
                    }
                }
            }
        } catch (Exception e) {
            LOG.info("Exception in generating follower reassignment plan", e);
        }
    }

    private Map<String, List<PartitionInfo>> getTopicPartitionInfoMap() {
        return KafkaUtils.getKafkaConsumer(this.zkUrl, this.securityProtocol, this.consumerConfigs).listTopics();
    }

    public List<KafkaBroker> getHighTrafficBroker() {
        List<KafkaBroker> highTrafficBrokers = this.kafkaCluster.getHighTrafficBrokers();
        Collections.sort(highTrafficBrokers);
        Collections.reverse(highTrafficBrokers);
        for (KafkaBroker kafkaBroker : highTrafficBrokers) {
            LOG.info("high traffic borker: {} : [{}, {}]", kafkaBroker.getName(), Long.valueOf(kafkaBroker.getMaxBytesIn()), Long.valueOf(kafkaBroker.getMaxBytesOut()));
        }
        return highTrafficBrokers;
    }

    public String getWorkloadBalancingPlanInJson(List<KafkaBroker> list) {
        this.kafkaCluster.clearResourceAllocationCounters();
        this.reassignmentFailures.clear();
        Map<String, List<PartitionInfo>> topicPartitionInfoMap = getTopicPartitionInfoMap();
        Map<Integer, List<TopicPartition>> brokerLeaderPartitions = getBrokerLeaderPartitions(topicPartitionInfoMap);
        double maxBytesIn = this.kafkaCluster.getMaxBytesIn() / this.kafkaCluster.size();
        double maxBytesOut = this.kafkaCluster.getMaxBytesOut() / this.kafkaCluster.size();
        LOG.info("Cluster {}: bytesInAvg={}, bytesOutAvg={}", this.zkUrl, Double.valueOf(maxBytesIn), Double.valueOf(maxBytesOut));
        for (KafkaBroker kafkaBroker : list) {
            try {
                if (kafkaBroker.getMaxBytesOut() > this.clusterConfig.getNetworkOutLimitInBytes()) {
                    generateLeadersReassignmentPlan(kafkaBroker, brokerLeaderPartitions.get(Integer.valueOf(kafkaBroker.getId())), maxBytesIn, maxBytesOut);
                } else if (kafkaBroker.getMaxBytesIn() > this.clusterConfig.getNetworkInLimitInBytes()) {
                    generateFollowerReassignmentPlan(kafkaBroker);
                }
            } catch (Exception e) {
                LOG.info("Exception in generating assignment plan for {}", kafkaBroker.getName(), e);
            }
        }
        if (!this.reassignmentFailures.isEmpty()) {
            return null;
        }
        LOG.info("Printing reassignment map + preferred leaders.");
        this.reassignmentMap.values().stream().forEach(reassignmentInfo -> {
            LOG.info(reassignmentInfo);
        });
        this.preferredLeaders.values().stream().forEach(preferredReplicaElectionInfo -> {
            LOG.info(preferredReplicaElectionInfo);
        });
        LOG.info("End of printing reassignment map + preferred leaders.");
        if (!this.preferredLeaders.isEmpty()) {
            HashSet hashSet = new HashSet();
            Iterator<PreferredReplicaElectionInfo> it = this.preferredLeaders.values().iterator();
            while (it.hasNext()) {
                TopicPartition topicPartition = it.next().topicPartition;
                hashSet.add(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()));
            }
            String preferredReplicaLeaderElectionZkData = ZkUtils.preferredReplicaLeaderElectionZkData(hashSet);
            List zookeeperAcls = KafkaUtils.getZookeeperAcls(false);
            if (!this.zkUtils.pathExists("/admin/preferred_replica_election")) {
                this.zkUtils.createPersistentPath("/admin/preferred_replica_election", preferredReplicaLeaderElectionZkData, zookeeperAcls);
            }
        }
        HashMap hashMap = new HashMap();
        for (String str : topicPartitionInfoMap.keySet()) {
            List<PartitionInfo> list2 = topicPartitionInfoMap.get(str);
            hashMap.putIfAbsent(str, new HashMap());
            Map map = (Map) hashMap.get(str);
            for (PartitionInfo partitionInfo : list2) {
                map.put(Integer.valueOf(partitionInfo.partition()), partitionInfo);
            }
        }
        HashMap hashMap2 = new HashMap();
        java.util.HashSet hashSet2 = new java.util.HashSet();
        for (TopicPartition topicPartition2 : this.reassignmentMap.keySet()) {
            ReassignmentInfo reassignmentInfo2 = this.reassignmentMap.get(topicPartition2);
            PartitionInfo partitionInfo2 = (PartitionInfo) ((Map) hashMap.get(topicPartition2.topic())).get(Integer.valueOf(topicPartition2.partition()));
            Node[] replicas = partitionInfo2.replicas();
            Integer[] numArr = new Integer[partitionInfo2.replicas().length];
            for (int i = 0; i < replicas.length; i++) {
                if (replicas[i].id() == reassignmentInfo2.source.getId()) {
                    numArr[i] = Integer.valueOf(reassignmentInfo2.dest.getId());
                } else {
                    numArr[i] = Integer.valueOf(replicas[i].id());
                }
            }
            hashMap2.put(topicPartition2, numArr);
            hashSet2.add(Integer.valueOf(reassignmentInfo2.source.getId()));
        }
        return hashMap2.size() > 0 ? ZkUtils.formatAsReassignmentJson(getAssignmentPlan(hashMap2)) : "";
    }

    public void balanceWorkload() {
        String workloadBalancingPlanInJson = getWorkloadBalancingPlanInJson(getHighTrafficBroker());
        if (workloadBalancingPlanInJson == null || workloadBalancingPlanInJson.isEmpty()) {
            return;
        }
        LOG.info("Assignment plan: {}" + workloadBalancingPlanInJson);
        reassignTopicPartitions(workloadBalancingPlanInJson);
    }

    private Map<TopicPartition, Double> sortTopicPartitionsByTraffic(List<TopicPartition> list) {
        HashMap hashMap = new HashMap();
        for (TopicPartition topicPartition : list) {
            try {
                hashMap.put(topicPartition, Double.valueOf(this.kafkaCluster.getMaxBytesIn(topicPartition) + this.kafkaCluster.getMaxBytesOut(topicPartition)));
            } catch (Exception e) {
                LOG.info("Exception in sorting topic partition {}", topicPartition, e);
            }
        }
        return OperatorUtil.sortByValue(hashMap);
    }

    private void reassignTopicPartitions(String str) {
        if (this.zkUtils.pathExists("/admin/reassign_partitions")) {
            LOG.warn("{} : There is an existing assignment.", this.clusterConfig.getClusterName());
            return;
        }
        if (this.clusterConfig.dryRun()) {
            return;
        }
        this.zkUtils.createPersistentPath("/admin/reassign_partitions", str, KafkaUtils.getZookeeperAcls(false));
        LOG.info("Set the reassignment data: ");
        this.actionReporter.sendMessage(this.clusterConfig.getClusterName(), "partition reassignment : " + str);
        Email.notifyOnPartitionReassignment(this.drkafkaConfig.getNotificationEmails(), this.clusterConfig.getClusterName(), str);
    }

    private scala.collection.Map<TopicAndPartition, Seq<Object>> getAssignmentPlan(Map<TopicPartition, Integer[]> map) {
        scala.collection.mutable.HashMap hashMap = new scala.collection.mutable.HashMap();
        for (Map.Entry<TopicPartition, Integer[]> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            hashMap.put(new TopicAndPartition(key.topic(), key.partition()), JavaConverters.asScalaBuffer((List) Arrays.asList(entry.getValue()).stream().map(num -> {
                return num;
            }).collect(Collectors.toList())).seq());
        }
        if (!$assertionsDisabled && map.size() != hashMap.size()) {
            throw new AssertionError();
        }
        LOG.debug("replicaMap.size = {}, result.size = {}", Integer.valueOf(map.size()), Integer.valueOf(hashMap.size()));
        return hashMap;
    }

    public List<PartitionInfo> filterOutInReassignmentUrps(List<PartitionInfo> list, Map<String, Integer> map) {
        ArrayList arrayList = new ArrayList();
        for (PartitionInfo partitionInfo : list) {
            if (partitionInfo.replicas().length <= map.get(partitionInfo.topic()).intValue()) {
                arrayList.add(partitionInfo);
            } else {
                java.util.HashSet hashSet = new java.util.HashSet();
                for (Node node : partitionInfo.replicas()) {
                    if (node.host() != null && OperatorUtil.pingKafkaBroker(node.host(), 9092, 5000)) {
                        hashSet.add(Integer.valueOf(node.id()));
                    }
                }
                if (hashSet.size() < map.get(partitionInfo.topic()).intValue()) {
                    arrayList.add(partitionInfo);
                }
            }
        }
        return arrayList;
    }

    public UnderReplicatedReason getUnderReplicatedReason(String str, int i, int i2, int i3, TopicPartition topicPartition) {
        UnderReplicatedReason underReplicatedReason = UnderReplicatedReason.UNKNOWN;
        if (str != null && isDeadBroker(str, i, i2, topicPartition)) {
            underReplicatedReason = UnderReplicatedReason.FOLLOWER_FAILURE;
        } else if (i3 < 0) {
            LOG.error("No live leader {}:{}", str, Integer.valueOf(i2));
            underReplicatedReason = UnderReplicatedReason.NO_LEADER_FAILURE;
        } else {
            KafkaBroker broker = this.kafkaCluster.getBroker(i3);
            if (broker != null && isDeadBroker(broker.getName(), i, i3, topicPartition)) {
                underReplicatedReason = UnderReplicatedReason.LEADER_FAILURE;
            } else if (isNetworkSaturated(i3)) {
                underReplicatedReason = UnderReplicatedReason.LEADER_NETWORK_SATURATION;
            } else if (isNetworkSaturated(i3)) {
                underReplicatedReason = UnderReplicatedReason.FOLLOWER_NETWORK_SATURATION;
            }
        }
        return underReplicatedReason;
    }

    private Map<TopicPartition, Integer[]> generateReassignmentPlanForDeadBrokers(List<OutOfSyncReplica> list) {
        HashMap hashMap = new HashMap();
        boolean z = true;
        boolean enabledRackAwareness = this.clusterConfig.enabledRackAwareness();
        Map<String, PriorityQueue<KafkaBroker>> map = null;
        PriorityQueue<KafkaBroker> priorityQueue = null;
        if (enabledRackAwareness) {
            map = this.kafkaCluster.getBrokerQueueByLocality();
        } else {
            priorityQueue = this.kafkaCluster.getBrokerQueue();
        }
        Iterator<OutOfSyncReplica> it = list.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            OutOfSyncReplica next = it.next();
            double maxBytesIn = this.kafkaCluster.getMaxBytesIn(next.topicPartition);
            double maxBytesOut = this.kafkaCluster.getMaxBytesOut(next.topicPartition);
            int intValue = next.replicaBrokers.get(0).intValue();
            Map<Integer, KafkaBroker> alternativeBrokersByLocality = enabledRackAwareness ? this.kafkaCluster.getAlternativeBrokersByLocality(map, next, maxBytesIn, maxBytesOut, intValue) : this.kafkaCluster.getAlternativeBrokers(priorityQueue, next, maxBytesIn, maxBytesOut, intValue);
            if (alternativeBrokersByLocality == null) {
                z = false;
                Iterator<Integer> it2 = next.outOfSyncBrokers.iterator();
                while (it2.hasNext()) {
                    this.reassignmentFailures.add(new MutablePair<>(this.kafkaCluster.getBroker(it2.next().intValue()), next.topicPartition));
                }
            } else {
                List<Integer> list2 = next.replicaBrokers;
                Integer[] numArr = new Integer[list2.size()];
                for (int i = 0; i < list2.size(); i++) {
                    int intValue2 = list2.get(i).intValue();
                    numArr[i] = Integer.valueOf(alternativeBrokersByLocality.containsKey(Integer.valueOf(intValue2)) ? alternativeBrokersByLocality.get(Integer.valueOf(intValue2)).getId() : intValue2);
                }
                hashMap.put(next.topicPartition, numArr);
            }
        }
        if (!z && hashMap.size() > 0) {
            this.kafkaCluster.clearResourceAllocationCounters();
        }
        if (z) {
            return hashMap;
        }
        return null;
    }

    public void handleUnderReplicatedPartitions(List<PartitionInfo> list, Map<String, Integer> map) {
        UnderReplicatedReason underReplicatedReason;
        LOG.info("Start handling under-replicated partitions for {}", this.clusterConfig.getClusterName());
        this.topicPartitionAssignments.clear();
        List<PartitionInfo> filterOutInReassignmentUrps = filterOutInReassignmentUrps(list, map);
        List<OutOfSyncReplica> list2 = (List) filterOutInReassignmentUrps.stream().map(partitionInfo -> {
            TopicPartition topicPartition = new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
            OutOfSyncReplica outOfSyncReplica = new OutOfSyncReplica(partitionInfo);
            outOfSyncReplica.replicaBrokers = getReplicaAssignment(topicPartition);
            return outOfSyncReplica;
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        java.util.HashSet hashSet = new java.util.HashSet();
        for (OutOfSyncReplica outOfSyncReplica : list2) {
            int id = outOfSyncReplica.leader == null ? -1 : outOfSyncReplica.leader.id();
            Iterator<Integer> it = outOfSyncReplica.outOfSyncBrokers.iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                KafkaBroker broker = this.kafkaCluster.getBroker(intValue);
                MutablePair mutablePair = new MutablePair(Integer.valueOf(intValue), Integer.valueOf(id));
                Integer num = (Integer) hashMap2.get(mutablePair);
                if (num == null) {
                    num = 0;
                    hashMap2.put(mutablePair, null);
                }
                if (!hashMap.containsKey(mutablePair) || num.intValue() < 3) {
                    if (hashSet.contains(Integer.valueOf(intValue))) {
                        underReplicatedReason = UnderReplicatedReason.FOLLOWER_FAILURE;
                    } else if (hashSet.contains(Integer.valueOf(id))) {
                        underReplicatedReason = UnderReplicatedReason.LEADER_FAILURE;
                    } else {
                        underReplicatedReason = getUnderReplicatedReason(broker.getName(), broker.getPort(), intValue, id, outOfSyncReplica.topicPartition);
                        if (underReplicatedReason == UnderReplicatedReason.FOLLOWER_FAILURE) {
                            hashSet.add(Integer.valueOf(intValue));
                        } else if (underReplicatedReason == UnderReplicatedReason.LEADER_FAILURE) {
                            hashSet.add(Integer.valueOf(id));
                        }
                    }
                    hashMap.put(mutablePair, underReplicatedReason);
                    Integer.valueOf(num.intValue() + 1);
                    hashMap2.put(mutablePair, num);
                }
            }
        }
        LOG.info("URP Reasons: {}", hashMap);
        boolean z = true;
        boolean z2 = true;
        for (Map.Entry entry : hashMap.entrySet()) {
            z2 &= ((UnderReplicatedReason) entry.getValue()) == UnderReplicatedReason.FOLLOWER_FAILURE;
        }
        LOG.info("Down brokers: " + hashSet);
        if (z2) {
            Map<TopicPartition, Integer[]> generateReassignmentPlanForDeadBrokers = generateReassignmentPlanForDeadBrokers(list2);
            if (generateReassignmentPlanForDeadBrokers == null || generateReassignmentPlanForDeadBrokers.isEmpty()) {
                LOG.error("Failed to generate a reassignment plan");
                OpenTsdbMetricConverter.incr(DoctorKafkaMetrics.HANDLE_URP_FAILURE, 1, new String[]{"cluster=" + this.zkUrl});
            } else {
                String formatAsReassignmentJson = ZkUtils.formatAsReassignmentJson(getAssignmentPlan(generateReassignmentPlanForDeadBrokers));
                LOG.info("Reassignment plan: {}", formatAsReassignmentJson);
                reassignTopicPartitions(formatAsReassignmentJson);
                z = false;
            }
        }
        if (z) {
            Email.alertOnFailureInHandlingUrps(this.drkafkaConfig.getNotificationEmails(), this.clusterConfig.getClusterName(), filterOutInReassignmentUrps, this.reassignmentFailures, hashSet);
        }
    }

    private boolean isDeadBroker(String str, int i, int i2, TopicPartition topicPartition) {
        if (OperatorUtil.pingKafkaBroker(str, i, 5000)) {
            LOG.debug("Broker {} is alive as {}:9092 is reachable", Integer.valueOf(i2), str);
            if (OperatorUtil.canFetchData(str, i, topicPartition.topic(), topicPartition.partition())) {
                LOG.debug("We are able to fetch data from broker {}", Integer.valueOf(i2));
                return false;
            }
            LOG.warn("We are not able to fetch data from broker {} topic {}, par {}", Integer.valueOf(i2), topicPartition.topic(), Integer.valueOf(topicPartition.partition()));
            return true;
        }
        long uptime = ManagementFactory.getRuntimeMXBean().getUptime();
        KafkaBroker kafkaBroker = this.kafkaCluster.brokers.get(Integer.valueOf(i2));
        if (kafkaBroker == null) {
            return uptime < 300000;
        }
        List<BrokerStats> brokerStatsList = this.kafkaCluster.getBrokerStatsList(i2);
        BrokerStats latestStats = kafkaBroker.getLatestStats();
        if (System.currentTimeMillis() - latestStats.getTimestamp().longValue() > 300000) {
            LOG.info("Haven't received {} brokerstats info for {} seconds", Integer.valueOf(i2), Double.valueOf((r0 - latestStats.getTimestamp().longValue()) / 1000.0d));
            return true;
        }
        boolean z = true;
        Iterator<BrokerStats> it = brokerStatsList.iterator();
        while (it.hasNext()) {
            z &= it.next().getHasFailure().booleanValue();
        }
        LOG.info("# brokerstats={}, allStatsHaveFailure={}", Integer.valueOf(brokerStatsList.size()), Boolean.valueOf(z));
        return brokerStatsList.size() == NUM_BROKER_STATS && z;
    }

    private boolean isNetworkSaturated(int i) {
        BrokerStats latestBrokerStats = this.kafkaCluster.getLatestBrokerStats(i);
        return latestBrokerStats != null && ((double) (latestBrokerStats.getLeadersBytesIn1MinRate().longValue() + latestBrokerStats.getLeadersBytesOut1MinRate().longValue())) > this.clusterConfig.getNetworkBandwidthInBytes();
    }

    public Map<Integer, List<TopicPartition>> getBrokerLeaderPartitions(Map<String, List<PartitionInfo>> map) {
        HashMap hashMap = new HashMap();
        for (String str : map.keySet()) {
            List<PartitionInfo> list = map.get(str);
            if (list == null) {
                LOG.error("Failed to get partition info for {}", str);
            } else {
                for (PartitionInfo partitionInfo : list) {
                    Node leader = partitionInfo.leader();
                    if (leader != null) {
                        hashMap.putIfAbsent(Integer.valueOf(leader.id()), new ArrayList());
                        ((List) hashMap.get(Integer.valueOf(leader.id()))).add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
                    }
                }
            }
        }
        return hashMap;
    }

    public static List<PartitionInfo> getUnderReplicatedPartitions(String str, SecurityProtocol securityProtocol, Map<String, String> map, List<String> list, scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>> map2, Map<String, Integer> map3, Map<String, Integer> map4) {
        ArrayList arrayList = new ArrayList();
        KafkaConsumer kafkaConsumer = KafkaUtils.getKafkaConsumer(str, securityProtocol, map);
        for (String str2 : list) {
            List<PartitionInfo> partitionsFor = kafkaConsumer.partitionsFor(str2);
            if (partitionsFor == null) {
                LOG.error("Failed to get partition info for {}", str2);
            } else {
                int intValue = map4.get(str2).intValue();
                boolean[] zArr = new boolean[intValue];
                for (int i = 0; i < intValue; i++) {
                    zArr[i] = true;
                }
                for (PartitionInfo partitionInfo : partitionsFor) {
                    if (partitionInfo.inSyncReplicas().length < partitionInfo.replicas().length && map3.get(partitionInfo.topic()).intValue() > partitionInfo.inSyncReplicas().length) {
                        arrayList.add(partitionInfo);
                    }
                    zArr[partitionInfo.partition()] = false;
                }
                for (int i2 = 0; i2 < intValue; i2++) {
                    if (zArr[i2]) {
                        arrayList.add(new PartitionInfo(str2, i2, (Node) null, (Node[]) JavaConverters.seqAsJavaList((Seq) ((scala.collection.Map) map2.get(str2).get()).get(Integer.valueOf(i2)).get()).stream().map(obj -> {
                            return new Node(((Integer) obj).intValue(), "", -1);
                        }).toArray(i3 -> {
                            return new Node[i3];
                        }), new Node[0]));
                    }
                }
            }
        }
        return arrayList;
    }

    public List<Broker> getNoStatsBrokers() {
        List seqAsJavaList = JavaConverters.seqAsJavaList(this.zkUtils.getAllBrokersInCluster());
        ArrayList arrayList = new ArrayList();
        seqAsJavaList.stream().forEach(broker -> {
            if (this.kafkaCluster.getBroker(broker.id()) == null) {
                arrayList.add(broker);
            }
        });
        return arrayList;
    }

    public List<KafkaBroker> getAllBrokers() {
        return new ArrayList(this.kafkaCluster.brokers.values());
    }

    private boolean checkAndReplaceDeadBrokers() {
        long currentTimeMillis = System.currentTimeMillis();
        if (this.brokerReplacer.busy()) {
            long replacementStartTime = (currentTimeMillis - this.brokerReplacer.getReplacementStartTime()) / 1000;
            if (replacementStartTime > MAX_HOST_REPLACEMENT_TIME_SECONDS) {
                Email.alertOnProlongedBrokerReplacement(this.drkafkaConfig.getNotificationEmails(), this.clusterConfig.getClusterName(), this.brokerReplacer.getReplacedBroker(), replacementStartTime);
            }
            LOG.info("{} broker replacer is busy with replacing {}", this.clusterConfig.getClusterName(), this.brokerReplacer.getReplacedBroker());
            return false;
        }
        KafkaBroker kafkaBroker = null;
        Iterator<Map.Entry<Integer, KafkaBroker>> it = this.kafkaCluster.brokers.entrySet().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            KafkaBroker value = it.next().getValue();
            if (!value.isDecommissioned() && (currentTimeMillis - value.getLastStatsTimestamp()) / 1000.0d > this.clusterConfig.getBrokerReplacementNoStatsSeconds()) {
                kafkaBroker = value;
                break;
            }
        }
        if (kafkaBroker == null) {
            return true;
        }
        String name = kafkaBroker.getName();
        String clusterName = this.clusterConfig.getClusterName();
        try {
            if ((currentTimeMillis - this.zookeeperClient.getLastBrokerReplacementTime(this.clusterConfig.getClusterName())) / 1000 < this.drkafkaConfig.getBrokerReplacementIntervalInSeconds()) {
                LOG.info("Cannot replace {}:{} due to replace frequency limitation", clusterName, name);
                return false;
            }
            if (this.clusterConfig.dryRun()) {
                LOG.info("Dry run: Replacing {} in {}", name, clusterName);
                return true;
            }
            LOG.info("Replacing {}  in {}", name, clusterName);
            this.brokerReplacer.replaceBroker(name);
            this.zookeeperClient.recordBrokerTermination(clusterName, name);
            this.actionReporter.sendMessage(clusterName, "broker replacement : " + name);
            Email.notifyOnBrokerReplacement(this.drkafkaConfig.getNotificationEmails(), clusterName, name);
            return true;
        } catch (Exception e) {
            LOG.error("Failed to check last broker replacement info for {}", clusterName, e);
            return false;
        }
    }

    public void enableMaintenanceMode() {
        this.maintenanceMode.set(true);
        LOG.info("Enabled maintenace mode for:" + this.clusterConfig.getClusterName());
        Email.notifyOnMaintenanceMode(this.drkafkaConfig.getNotificationEmails(), this.clusterConfig.getClusterName(), this.maintenanceMode.get());
    }

    public void disableMaintenanceMode() {
        this.maintenanceMode.set(false);
        LOG.info("Disabled maintenace mode for:" + this.clusterConfig.getClusterName());
        Email.notifyOnMaintenanceMode(this.drkafkaConfig.getNotificationEmails(), this.clusterConfig.getClusterName(), this.maintenanceMode.get());
    }

    public void decommissionBroker(Integer num) {
        if (this.kafkaCluster.getBroker(num.intValue()).decommission()) {
            return;
        }
        Email.notifyOnDecommissioningBroker(this.drkafkaConfig.getNotificationEmails(), this.kafkaCluster.name(), String.valueOf(num));
    }

    public void cancelDecommissionBroker(Integer num) {
        if (this.kafkaCluster.getBroker(num.intValue()).cancelDecommission()) {
            Email.notifyOnCancelledDecommissioningBroker(this.drkafkaConfig.getNotificationEmails(), this.kafkaCluster.name(), String.valueOf(num));
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        long checkIntervalInSeconds = this.clusterConfig.getCheckIntervalInSeconds() * 1000;
        this.stopped = false;
        boolean z = false;
        long j = 0;
        while (!this.stopped) {
            try {
                Thread.sleep(checkIntervalInSeconds);
                if (this.maintenanceMode.get()) {
                    LOG.debug("Cluster:" + this.clusterConfig.getClusterName() + " is in maintenace mode");
                } else {
                    ZkUtils zkUtils = KafkaUtils.getZkUtils(this.zkUrl);
                    List<Broker> noStatsBrokers = getNoStatsBrokers();
                    if (noStatsBrokers.isEmpty()) {
                        Seq allTopics = zkUtils.getAllTopics();
                        List seqAsJavaList = JavaConverters.seqAsJavaList(allTopics);
                        scala.collection.mutable.Map partitionAssignmentForTopics = zkUtils.getPartitionAssignmentForTopics(allTopics);
                        HashMap hashMap = new HashMap();
                        HashMap hashMap2 = new HashMap();
                        seqAsJavaList.stream().forEach(str -> {
                            int size = ((scala.collection.Map) partitionAssignmentForTopics.get(str).get()).size();
                            int size2 = ((Seq) ((Tuple2) ((scala.collection.Map) partitionAssignmentForTopics.get(str).get()).head())._2()).size();
                            hashMap2.put(str, Integer.valueOf(size));
                            hashMap.put(str, Integer.valueOf(size2));
                        });
                        this.underReplicatedPartitions = getUnderReplicatedPartitions(this.zkUrl, this.securityProtocol, this.consumerConfigs, seqAsJavaList, partitionAssignmentForTopics, hashMap, hashMap2);
                        LOG.info("Under-replicated partitions: {}", Integer.valueOf(this.underReplicatedPartitions.size()));
                        Iterator<PartitionInfo> it = this.underReplicatedPartitions.iterator();
                        while (it.hasNext()) {
                            LOG.info("under-replicated : {}", it.next());
                        }
                        this.kafkaCluster.clearResourceAllocationCounters();
                        if (this.underReplicatedPartitions.size() > 0) {
                            if (!z) {
                                z = true;
                                j = System.currentTimeMillis();
                            } else if (System.currentTimeMillis() - j > this.clusterConfig.getUnderReplicatedAlertTimeInMs()) {
                                Email.alertOnProlongedUnderReplicatedPartitions(this.drkafkaConfig.getAlertEmails(), this.clusterConfig.getClusterName(), this.clusterConfig.getUnderReplicatedAlertTimeInSeconds(), this.underReplicatedPartitions);
                            }
                            LOG.info("Under-replicated partitions in cluster {} : {}", this.clusterConfig.getClusterName(), Integer.valueOf(this.underReplicatedPartitions.size()));
                            handleUnderReplicatedPartitions(this.underReplicatedPartitions, hashMap);
                        } else {
                            z = false;
                            j = Long.MAX_VALUE;
                            if (this.clusterConfig.enabledWorloadBalancing()) {
                                this.preferredLeaders.clear();
                                this.reassignmentMap.clear();
                                balanceWorkload();
                            }
                        }
                        if (this.clusterConfig.enabledDeadbrokerReplacement()) {
                            checkAndReplaceDeadBrokers();
                        }
                    } else {
                        Email.alertOnNoStatsBrokers(this.drkafkaConfig.getAlertEmails(), this.clusterConfig.getClusterName(), noStatsBrokers);
                    }
                }
            } catch (Exception e) {
                LOG.error("Unexpected failure in cluster manager for {}: ", this.zkUrl, e);
            }
        }
    }

    public boolean isMaintenanceModeEnabled() {
        return this.maintenanceMode.get();
    }

    static {
        $assertionsDisabled = !KafkaClusterManager.class.desiredAssertionStatus();
        LOG = LogManager.getLogger(KafkaClusterManager.class);
        gson = new Gson();
    }
}
