package com.pinterest.doctorkafka;

import com.codahale.metrics.Histogram;
import com.codahale.metrics.SlidingWindowReservoir;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.pinterest.doctorkafka.KafkaBroker;
import com.pinterest.doctorkafka.config.DoctorKafkaClusterConfig;
import com.pinterest.doctorkafka.util.OutOfSyncReplica;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.math3.optimization.direct.CMAESOptimizer;
import org.apache.kafka.common.TopicPartition;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:com/pinterest/doctorkafka/KafkaCluster.class */
public class KafkaCluster {
    private static final Logger LOG = LogManager.getLogger((Class<?>) KafkaCluster.class);
    private static final int MAX_NUM_STATS = 5;
    private static final int INVALID_BROKERSTATS_TIME = 240000;
    private static final long REASSIGNMENT_COOLDOWN_WINDOW_IN_MS = 1800000;
    private static final int SLIDING_WINDOW_SIZE = 5760;
    private DoctorKafkaClusterConfig clusterConfig;
    public String zkUrl;
    public ConcurrentMap<String, Set<TopicPartition>> topicPartitions = new ConcurrentHashMap();
    private ConcurrentMap<TopicPartition, Histogram> bytesInHistograms = new ConcurrentHashMap();
    private ConcurrentMap<TopicPartition, Histogram> bytesOutHistograms = new ConcurrentHashMap();
    private ConcurrentMap<TopicPartition, Long> reassignmentTimestamps = new ConcurrentHashMap();
    public ConcurrentMap<Integer, KafkaBroker> brokers = new ConcurrentHashMap();
    private ConcurrentMap<Integer, LinkedList<BrokerStats>> brokerStatsMap = new ConcurrentHashMap();

    public KafkaCluster(String str, DoctorKafkaClusterConfig doctorKafkaClusterConfig) {
        this.zkUrl = str;
        this.clusterConfig = doctorKafkaClusterConfig;
    }

    public int size() {
        return this.brokers.size();
    }

    public String name() {
        return this.clusterConfig.getClusterName();
    }

    public void recordBrokerStats(BrokerStats brokerStats) {
        try {
            int intValue = brokerStats.getId().intValue();
            LinkedList<BrokerStats> computeIfAbsent = this.brokerStatsMap.computeIfAbsent(Integer.valueOf(intValue), num -> {
                return new LinkedList();
            });
            synchronized (computeIfAbsent) {
                if (computeIfAbsent.size() == 5) {
                    computeIfAbsent.removeFirst();
                }
                computeIfAbsent.addLast(brokerStats);
            }
            if (!brokerStats.getHasFailure().booleanValue()) {
                this.brokers.computeIfAbsent(Integer.valueOf(intValue), num2 -> {
                    return new KafkaBroker(this.clusterConfig, this, num2.intValue());
                }).update(brokerStats);
            }
            if (brokerStats.getLeaderReplicaStats() != null) {
                for (ReplicaStat replicaStat : brokerStats.getLeaderReplicaStats()) {
                    String topic = replicaStat.getTopic();
                    TopicPartition topicPartition = new TopicPartition(topic, replicaStat.getPartition().intValue());
                    this.topicPartitions.computeIfAbsent(topic, str -> {
                        return new HashSet();
                    }).add(topicPartition);
                    if (replicaStat.getInReassignment().booleanValue()) {
                        this.reassignmentTimestamps.compute(topicPartition, (topicPartition2, l) -> {
                            return (l == null || l.longValue() < replicaStat.getTimestamp().longValue()) ? replicaStat.getTimestamp() : l;
                        });
                    } else {
                        if (brokerStats.getTimestamp().longValue() - this.reassignmentTimestamps.getOrDefault(topicPartition, 0L).longValue() >= REASSIGNMENT_COOLDOWN_WINDOW_IN_MS) {
                            this.bytesInHistograms.computeIfAbsent(topicPartition, topicPartition3 -> {
                                return new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE));
                            });
                            this.bytesOutHistograms.computeIfAbsent(topicPartition, topicPartition4 -> {
                                return new Histogram(new SlidingWindowReservoir(SLIDING_WINDOW_SIZE));
                            });
                            this.bytesInHistograms.get(topicPartition).update(replicaStat.getBytesIn15MinMeanRate().longValue());
                            this.bytesOutHistograms.get(topicPartition).update(replicaStat.getBytesOut15MinMeanRate().longValue());
                        }
                    }
                }
            }
        } catch (Exception e) {
            LOG.error("Failed to read broker stats : {}", brokerStats, e);
        }
    }

    public JsonElement toJson() {
        JsonObject jsonObject = new JsonObject();
        JsonArray jsonArray = new JsonArray();
        jsonObject.add("brokers", jsonArray);
        new ArrayList();
        synchronized (this.brokers) {
            Iterator<KafkaBroker> it = this.brokers.values().iterator();
            while (it.hasNext()) {
                jsonArray.add(it.next().toJson());
            }
        }
        return jsonObject;
    }

    public ConcurrentMap<TopicPartition, Histogram> getBytesInHistograms() {
        return this.bytesInHistograms;
    }

    public ConcurrentMap<TopicPartition, Histogram> getBytesOutHistograms() {
        return this.bytesOutHistograms;
    }

    public ConcurrentMap<TopicPartition, Long> getReassignmentTimestamps() {
        return this.reassignmentTimestamps;
    }

    public KafkaBroker getBroker(int i) {
        if (this.brokers.containsKey(Integer.valueOf(i))) {
            return this.brokers.get(Integer.valueOf(i));
        }
        return null;
    }

    public BrokerStats getLatestBrokerStats(int i) {
        synchronized (this.brokers) {
            if (this.brokers.containsKey(Integer.valueOf(i))) {
                return this.brokers.get(Integer.valueOf(i)).getLatestStats();
            }
            LOG.info("Failed to find broker {} in cluster {}", Integer.valueOf(i), this.zkUrl);
            return null;
        }
    }

    public List<BrokerStats> getBrokerStatsList(int i) {
        synchronized (this.brokers) {
            if (this.brokerStatsMap.containsKey(Integer.valueOf(i))) {
                return this.brokerStatsMap.get(Integer.valueOf(i));
            }
            LOG.info("Failed to find broker {} in cluster {}", Integer.valueOf(i), this.zkUrl);
            return null;
        }
    }

    public List<KafkaBroker> getHighTrafficBrokers() {
        double maxBytesIn = getMaxBytesIn() / this.brokers.size();
        double maxBytesOut = getMaxBytesOut() / this.brokers.size();
        double networkInLimitInBytes = this.clusterConfig.getNetworkInLimitInBytes();
        double networkOutLimitInBytes = this.clusterConfig.getNetworkOutLimitInBytes();
        ArrayList arrayList = new ArrayList();
        synchronized (this.brokers) {
            for (KafkaBroker kafkaBroker : this.brokers.values()) {
                double maxBytesIn2 = kafkaBroker.getMaxBytesIn();
                double maxBytesOut2 = kafkaBroker.getMaxBytesOut();
                if (maxBytesIn2 >= maxBytesIn || maxBytesOut2 >= maxBytesOut) {
                    if (maxBytesIn2 >= networkInLimitInBytes || maxBytesOut2 >= networkOutLimitInBytes) {
                        LOG.debug("High traffic broker: {} : [{}, {}]", kafkaBroker.getName(), Long.valueOf(kafkaBroker.getMaxBytesIn()), Long.valueOf(kafkaBroker.getMaxBytesOut()));
                        arrayList.add(kafkaBroker);
                    }
                }
            }
        }
        return arrayList;
    }

    public List<KafkaBroker> getLowTrafficBrokers() {
        double maxBytesIn = getMaxBytesIn() / this.brokers.size();
        double maxBytesOut = getMaxBytesOut() / this.brokers.size();
        ArrayList arrayList = new ArrayList();
        synchronized (this.brokers) {
            for (KafkaBroker kafkaBroker : this.brokers.values()) {
                try {
                    double maxBytesIn2 = kafkaBroker.getMaxBytesIn();
                    double maxBytesOut2 = kafkaBroker.getMaxBytesOut();
                    if (maxBytesIn2 < maxBytesIn && maxBytesOut2 < maxBytesOut) {
                        LOG.info("Low traffic broker {} : [{}, {}]", kafkaBroker.getName(), Long.valueOf(kafkaBroker.getMaxBytesIn()), Long.valueOf(kafkaBroker.getMaxBytesOut()));
                        arrayList.add(kafkaBroker);
                    }
                } catch (Exception e) {
                    LOG.info("catch unexpected exception");
                }
            }
        }
        return arrayList;
    }

    public PriorityQueue<KafkaBroker> getBrokerQueue() {
        PriorityQueue<KafkaBroker> priorityQueue = new PriorityQueue<>(new KafkaBroker.KafkaBrokerComparator());
        Iterator<Map.Entry<Integer, KafkaBroker>> it = this.brokers.entrySet().iterator();
        while (it.hasNext()) {
            KafkaBroker value = it.next().getValue();
            if (!isInvalidBroker(value)) {
                priorityQueue.add(value);
            }
        }
        return priorityQueue;
    }

    public Map<String, PriorityQueue<KafkaBroker>> getBrokerQueueByLocality() {
        HashMap hashMap = new HashMap();
        KafkaBroker.KafkaBrokerComparator kafkaBrokerComparator = new KafkaBroker.KafkaBrokerComparator();
        Iterator<Map.Entry<Integer, KafkaBroker>> it = this.brokers.entrySet().iterator();
        while (it.hasNext()) {
            KafkaBroker value = it.next().getValue();
            if (!isInvalidBroker(value)) {
                ((PriorityQueue) hashMap.computeIfAbsent(value.getRackId(), str -> {
                    return new PriorityQueue(kafkaBrokerComparator);
                })).add(value);
            }
        }
        return hashMap;
    }

    protected boolean isInvalidBroker(KafkaBroker kafkaBroker) {
        BrokerStats latestStats = kafkaBroker.getLatestStats();
        return latestStats == null || latestStats.getHasFailure().booleanValue() || System.currentTimeMillis() - latestStats.getTimestamp().longValue() > 240000 || kafkaBroker.isDecommissioned();
    }

    public Map<Integer, KafkaBroker> getAlternativeBrokers(PriorityQueue<KafkaBroker> priorityQueue, OutOfSyncReplica outOfSyncReplica, double d, double d2, int i) {
        boolean z = true;
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        Iterator<Integer> it = outOfSyncReplica.outOfSyncBrokers.iterator();
        while (it.hasNext()) {
            z = findNextBrokerForOosReplica(priorityQueue, hashSet, outOfSyncReplica.replicaBrokers, hashMap, Integer.valueOf(it.next().intValue()), outOfSyncReplica.topicPartition, Double.valueOf(d), Double.valueOf(d2), Integer.valueOf(i));
            if (!z) {
                break;
            }
        }
        priorityQueue.addAll(hashSet);
        if (z) {
            return hashMap;
        }
        return null;
    }

    public Map<Integer, KafkaBroker> getAlternativeBrokersByLocality(Map<String, PriorityQueue<KafkaBroker>> map, OutOfSyncReplica outOfSyncReplica, double d, double d2, int i) {
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = outOfSyncReplica.outOfSyncBrokers.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            ((List) hashMap.computeIfAbsent(this.brokers.get(Integer.valueOf(intValue)).getRackId(), str -> {
                return new ArrayList();
            })).add(Integer.valueOf(intValue));
        }
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        boolean z = true;
        for (Map.Entry entry : hashMap.entrySet()) {
            String str2 = (String) entry.getKey();
            List list = (List) entry.getValue();
            PriorityQueue<KafkaBroker> priorityQueue = map.get(str2);
            Set set = (Set) hashMap3.computeIfAbsent(str2, str3 -> {
                return new HashSet();
            });
            Iterator it2 = list.iterator();
            while (it2.hasNext()) {
                z = findNextBrokerForOosReplica(priorityQueue, set, outOfSyncReplica.replicaBrokers, hashMap2, (Integer) it2.next(), outOfSyncReplica.topicPartition, Double.valueOf(d), Double.valueOf(d2), Integer.valueOf(i));
                if (!z) {
                    break;
                }
            }
            if (!z) {
                break;
            }
        }
        for (Map.Entry entry2 : hashMap3.entrySet()) {
            map.get(entry2.getKey()).addAll((Collection) entry2.getValue());
        }
        if (z) {
            return hashMap2;
        }
        return null;
    }

    protected boolean findNextBrokerForOosReplica(PriorityQueue<KafkaBroker> priorityQueue, Collection<KafkaBroker> collection, Collection<Integer> collection2, Map<Integer, KafkaBroker> map, Integer num, TopicPartition topicPartition, Double d, Double d2, Integer num2) {
        KafkaBroker kafkaBroker;
        boolean reserveBandwidth;
        KafkaBroker poll = priorityQueue.poll();
        while (true) {
            kafkaBroker = poll;
            if (kafkaBroker == null || !collection2.contains(Integer.valueOf(kafkaBroker.getId()))) {
                break;
            }
            collection.add(kafkaBroker);
            poll = priorityQueue.poll();
        }
        if (kafkaBroker == null) {
            LOG.error("Failed to find a usable broker for fixing {}:{}", topicPartition, num);
            reserveBandwidth = false;
        } else {
            LOG.info("LeastUsedBroker for replacing {} : {}", num, Integer.valueOf(kafkaBroker.getId()));
            reserveBandwidth = num2 == num ? kafkaBroker.reserveBandwidth(topicPartition, d.doubleValue(), d2.doubleValue()) : kafkaBroker.reserveBandwidth(topicPartition, d.doubleValue(), CMAESOptimizer.DEFAULT_STOPFITNESS);
            if (reserveBandwidth) {
                map.put(num, kafkaBroker);
                collection.add(kafkaBroker);
            } else {
                LOG.error("Failed to allocate resource to replace {}:{}", topicPartition, num);
            }
        }
        return reserveBandwidth;
    }

    public KafkaBroker getAlternativeBroker(TopicPartition topicPartition, double d, double d2) {
        PriorityQueue priorityQueue = new PriorityQueue(new KafkaBroker.KafkaBrokerComparator());
        Iterator<Map.Entry<Integer, KafkaBroker>> it = this.brokers.entrySet().iterator();
        while (it.hasNext()) {
            KafkaBroker value = it.next().getValue();
            if (!value.hasTopicPartition(topicPartition)) {
                priorityQueue.add(value);
            }
        }
        KafkaBroker kafkaBroker = (KafkaBroker) priorityQueue.poll();
        LOG.info("LeastUsedBroker for replacing {} : {}", topicPartition, Integer.valueOf(kafkaBroker.getId()));
        if (kafkaBroker.reserveBandwidth(topicPartition, d, d2)) {
            return kafkaBroker;
        }
        LOG.error("Failed to allocate resource to replace {}", topicPartition);
        return null;
    }

    public long getMaxBytesIn(TopicPartition topicPartition) {
        return this.bytesInHistograms.get(topicPartition).getSnapshot().getMax();
    }

    public long getMaxBytesOut(TopicPartition topicPartition) {
        return this.bytesOutHistograms.get(topicPartition).getSnapshot().getMax();
    }

    public long getMaxBytesIn() {
        long j = 0;
        Iterator<Map.Entry<String, Set<TopicPartition>>> it = this.topicPartitions.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<TopicPartition> it2 = it.next().getValue().iterator();
            while (it2.hasNext()) {
                j += getMaxBytesIn(it2.next());
            }
        }
        return j;
    }

    public long getMaxBytesOut() {
        long j = 0;
        Iterator<Map.Entry<String, Set<TopicPartition>>> it = this.topicPartitions.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<TopicPartition> it2 = it.next().getValue().iterator();
            while (it2.hasNext()) {
                j += getMaxBytesOut(it2.next());
            }
        }
        return j;
    }

    public void clearResourceAllocationCounters() {
        Iterator<KafkaBroker> it = this.brokers.values().iterator();
        while (it.hasNext()) {
            it.next().clearResourceAllocationCounters();
        }
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry entry : new TreeMap(this.brokers).entrySet()) {
            sb.append("   " + entry.getKey() + " : ");
            sb.append(entry.getValue() + StringUtils.LF);
        }
        return sb.toString();
    }
}
