package com.linkedin.kafka.cruisecontrol.monitor.sampling;

import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityInfo;
import com.linkedin.kafka.cruisecontrol.metricsreporter.exception.UnknownVersionException;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.CruiseControlMetric;
import com.linkedin.kafka.cruisecontrol.metricsreporter.metric.RawMetricType;
import com.linkedin.kafka.cruisecontrol.monitor.MonitorUtils;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerLoad;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/CruiseControlMetricsProcessor.class */
public class CruiseControlMetricsProcessor {
    private static final Logger LOG = LoggerFactory.getLogger(CruiseControlMetricsProcessor.class);
    private static final long INIT_METRIC_TIMESTAMP = -1;
    private static final int MAX_PARTITION_ERROR_LOGS = 10;
    private final BrokerCapacityConfigResolver _brokerCapacityConfigResolver;
    private final boolean _allowCpuCapacityEstimation;
    private final Map<Integer, BrokerLoad> _brokerLoad = new HashMap();
    private final Map<Integer, Short> _cachedNumCoresByBroker = new HashMap();
    private long _maxMetricTimestamp = INIT_METRIC_TIMESTAMP;

    public CruiseControlMetricsProcessor(BrokerCapacityConfigResolver brokerCapacityConfigResolver, boolean z) {
        this._brokerCapacityConfigResolver = brokerCapacityConfigResolver;
        this._allowCpuCapacityEstimation = z;
    }

    public void addMetric(CruiseControlMetric cruiseControlMetric) {
        int brokerId = cruiseControlMetric.brokerId();
        LOG.trace("Adding cruise control metric {}", cruiseControlMetric);
        this._maxMetricTimestamp = Math.max(cruiseControlMetric.time(), this._maxMetricTimestamp);
        this._brokerLoad.compute(Integer.valueOf(brokerId), (num, brokerLoad) -> {
            BrokerLoad brokerLoad = brokerLoad == null ? new BrokerLoad() : brokerLoad;
            brokerLoad.recordMetric(cruiseControlMetric);
            return brokerLoad;
        });
    }

    private void updateCachedNumCoresByBroker(Cluster cluster) {
        Iterator<Integer> it = this._brokerLoad.keySet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Node nodeById = cluster.nodeById(intValue);
            if (nodeById != null) {
                this._cachedNumCoresByBroker.computeIfAbsent(Integer.valueOf(intValue), num -> {
                    BrokerCapacityInfo capacityForBroker = this._brokerCapacityConfigResolver.capacityForBroker(MonitorUtils.getRackHandleNull(nodeById), nodeById.host(), num.intValue());
                    if (this._allowCpuCapacityEstimation || !capacityForBroker.isEstimated()) {
                        return Short.valueOf(capacityForBroker.numCpuCores());
                    }
                    return null;
                });
            }
        }
    }

    private void updateDiskCapacityByBroker(Cluster cluster) {
        for (Map.Entry<Integer, BrokerLoad> entry : this._brokerLoad.entrySet()) {
            Integer key = entry.getKey();
            BrokerLoad value = entry.getValue();
            Node nodeById = cluster.nodeById(key.intValue());
            if (nodeById != null && value.brokerMetricAvailable(RawMetricType.BROKER_DISK_CAPACITY)) {
                this._brokerCapacityConfigResolver.updateDiskCapacityForBroker(MonitorUtils.getRackHandleNull(nodeById), nodeById.host(), key.intValue(), value.brokerMetric(RawMetricType.BROKER_DISK_CAPACITY) / 1048576.0d);
            }
        }
    }

    Map<Integer, Short> cachedNumCoresByBroker() {
        return this._cachedNumCoresByBroker;
    }

    public MetricSampler.Samples process(Cluster cluster, Set<TopicPartition> set, MetricSampler.SamplingMode samplingMode) {
        updateCachedNumCoresByBroker(cluster);
        updateDiskCapacityByBroker(cluster);
        this._brokerLoad.forEach((num, brokerLoad) -> {
            brokerLoad.prepareBrokerMetrics(cluster, num.intValue(), this._maxMetricTimestamp);
        });
        int i = 0;
        HashSet hashSet = new HashSet();
        if (samplingMode == MetricSampler.SamplingMode.ALL || samplingMode == MetricSampler.SamplingMode.PARTITION_METRICS_ONLY) {
            i = addPartitionMetricSamples(cluster, set, hashSet);
        }
        int i2 = 0;
        HashSet hashSet2 = new HashSet();
        if (samplingMode == MetricSampler.SamplingMode.ALL || samplingMode == MetricSampler.SamplingMode.BROKER_METRICS_ONLY) {
            i2 = addBrokerMetricSamples(cluster, hashSet2);
        }
        Logger logger = LOG;
        Object[] objArr = new Object[5];
        objArr[0] = Integer.valueOf(hashSet.size());
        objArr[1] = i > 0 ? "(" + i + " skipped)" : "";
        objArr[2] = Integer.valueOf(hashSet2.size());
        objArr[3] = i2 > 0 ? "(" + i2 + " skipped)" : "";
        objArr[4] = Long.valueOf(this._maxMetricTimestamp);
        logger.info("Generated {}{} partition metric samples and {}{} broker metric samples for timestamp {}.", objArr);
        return new MetricSampler.Samples(hashSet, hashSet2);
    }

    public void clear() {
        this._brokerLoad.clear();
        this._maxMetricTimestamp = INIT_METRIC_TIMESTAMP;
    }

    private int addPartitionMetricSamples(Cluster cluster, Set<TopicPartition> set, Set<PartitionMetricSample> set2) {
        int i = 0;
        int i2 = 0;
        Map<Integer, Map<String, Integer>> leaderDistribution = SamplingUtils.leaderDistribution(cluster);
        for (TopicPartition topicPartition : set) {
            try {
                PartitionMetricSample buildPartitionMetricSample = SamplingUtils.buildPartitionMetricSample(cluster, leaderDistribution, topicPartition, this._brokerLoad, this._maxMetricTimestamp, this._cachedNumCoresByBroker);
                if (buildPartitionMetricSample != null) {
                    LOG.trace("Added partition metrics sample for {}.", topicPartition);
                    set2.add(buildPartitionMetricSample);
                } else {
                    i++;
                }
            } catch (Exception e) {
                if (i2 < MAX_PARTITION_ERROR_LOGS) {
                    LOG.error("Error building partition metric sample for {}.", topicPartition, e);
                    i2++;
                } else {
                    LOG.trace("Error building partition metric sample for {}.", topicPartition, e);
                }
                i++;
            }
        }
        return i;
    }

    private int addBrokerMetricSamples(Cluster cluster, Set<BrokerMetricSample> set) {
        int i = 0;
        for (Node node : cluster.nodes()) {
            try {
                BrokerMetricSample buildBrokerMetricSample = SamplingUtils.buildBrokerMetricSample(node, this._brokerLoad, this._maxMetricTimestamp);
                if (buildBrokerMetricSample != null) {
                    LOG.trace("Added broker metric sample for broker {}.", Integer.valueOf(node.id()));
                    set.add(buildBrokerMetricSample);
                } else {
                    i++;
                }
            } catch (UnknownVersionException e) {
                LOG.error("Unrecognized serde version detected during broker metric sampling.", e);
                i++;
            } catch (Exception e2) {
                LOG.error("Error building broker metric sample for {}.", Integer.valueOf(node.id()), e2);
                i++;
            }
        }
        return i;
    }
}
