package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/DiskFailureDetector.class */
public class DiskFailureDetector implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(DiskFailureDetector.class);
    private final KafkaCruiseControl _kafkaCruiseControl;
    private final ConfluentAdmin _adminClient;
    private final LoadMonitor _loadMonitor;
    private final Queue<Anomaly> _anomalies;
    private final Time _time;
    private final boolean _allowCapacityEstimation;
    private int _lastCheckedClusterGeneration = -1;
    private final boolean _excludeRecentlyDemotedBrokers;
    private final boolean _excludeRecentlyRemovedBrokers;
    private final List<String> _selfHealingGoals;
    private final KafkaCruiseControlConfig _config;

    public DiskFailureDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, LoadMonitor loadMonitor, ConfluentAdmin confluentAdmin, Queue<Anomaly> queue, Time time, KafkaCruiseControl kafkaCruiseControl, List<String> list) {
        this._loadMonitor = loadMonitor;
        this._adminClient = confluentAdmin;
        this._anomalies = queue;
        this._time = time;
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._allowCapacityEstimation = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        this._excludeRecentlyDemotedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG).booleanValue();
        this._excludeRecentlyRemovedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue();
        this._selfHealingGoals = list;
        this._config = kafkaCruiseControlConfig;
    }

    private boolean shouldSkipDiskFailureDetection() {
        int clusterGeneration = this._loadMonitor.clusterModelGeneration().clusterGeneration();
        if (clusterGeneration == this._lastCheckedClusterGeneration) {
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug("Skipping disk failure detection because the model generation hasn't changed. Current model generation {}", this._loadMonitor.clusterModelGeneration());
            return true;
        }
        this._lastCheckedClusterGeneration = clusterGeneration;
        Set<Integer> deadBrokersWithReplicas = this._loadMonitor.deadBrokersWithReplicas(60000);
        if (deadBrokersWithReplicas.isEmpty()) {
            return AnomalyDetectorUtils.shouldSkipAnomalyDetection(this._loadMonitor, this._kafkaCruiseControl);
        }
        LOG.debug("Skipping disk failure detection because there are dead broker in the cluster, dead broker: {}", deadBrokersWithReplicas);
        return true;
    }

    @Override // java.lang.Runnable
    public void run() {
        try {
            try {
                if (shouldSkipDiskFailureDetection()) {
                    LOG.debug("Disk failure detection finished.");
                    return;
                }
                HashMap hashMap = new HashMap();
                this._adminClient.describeLogDirs((Set) this._loadMonitor.kafkaCluster().nodes().stream().mapToInt((v0) -> {
                    return v0.id();
                }).boxed().collect(Collectors.toSet())).values().forEach((num, kafkaFuture) -> {
                    try {
                        ((Map) kafkaFuture.get(this._config.getLong(KafkaCruiseControlConfig.LOGDIR_RESPONSE_TIMEOUT_MS_CONFIG).longValue(), TimeUnit.MILLISECONDS)).forEach((str, logDirInfo) -> {
                            if (logDirInfo.error != Errors.NONE) {
                                hashMap.putIfAbsent(num, new HashMap());
                                ((Map) hashMap.get(num)).put(str, Long.valueOf(this._time.milliseconds()));
                            }
                        });
                    } catch (InterruptedException | ExecutionException | TimeoutException e) {
                        LOG.warn("Retrieving logdir information for broker {} encountered exception {}.", num, e);
                    }
                });
                if (!hashMap.isEmpty()) {
                    this._anomalies.add(new DiskFailures(this._kafkaCruiseControl, hashMap, this._allowCapacityEstimation, this._excludeRecentlyDemotedBrokers, this._excludeRecentlyRemovedBrokers, this._selfHealingGoals));
                }
                LOG.debug("Disk failure detection finished.");
            } catch (Exception e) {
                LOG.error("Unexpected exception", e);
                LOG.debug("Disk failure detection finished.");
            }
        } catch (Throwable th) {
            LOG.debug("Disk failure detection finished.");
            throw th;
        }
    }
}
