package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.stream.Collectors;
import kafka.utils.ShutdownableThread;
import kafka.zk.FailedBroker;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.utils.Time;
import org.apache.zookeeper.client.ZKClientConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector.class */
public class BrokerFailureDetector extends ShutdownableThread {
    private static final Logger LOG = LoggerFactory.getLogger(BrokerFailureDetector.class);
    private static final String ZK_BROKER_FAILURE_METRIC_GROUP = "CruiseControlAnomaly";
    private static final String ZK_BROKER_FAILURE_METRIC_TYPE = "BrokerFailure";
    private static final String THREAD_NAME = "SBK_BrokerFailureDetector";
    private final KafkaCruiseControl _kafkaCruiseControl;
    private final KafkaZkClient kafkaZkClient;
    private final Map<Integer, Long> _failedBrokers;
    private final LoadMonitor _loadMonitor;
    private final Queue<Anomaly> _anomalies;
    private final Time _time;
    private final boolean _allowCapacityEstimation;
    private final boolean _excludeRecentlyDemotedBrokers;
    private final boolean _excludeRecentlyRemovedBrokers;
    private final List<String> _selfHealingGoals;
    private final ArrayBlockingQueue<BrokerChangedEvent> eventQueue;
    private boolean initialized;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector$BrokerChangedEvent.class */
    public class BrokerChangedEvent {
        private BrokerChangedEvent() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/BrokerFailureDetector$BrokerFailureListener.class */
    public class BrokerFailureListener implements KafkaZkClient.BrokerChangeListener {
        private BrokerFailureListener() {
        }

        public void handleChildChange() {
            BrokerFailureDetector.this.scheduleDetection();
        }
    }

    public BrokerFailureDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, Option<ZKClientConfig> option, LoadMonitor loadMonitor, Queue<Anomaly> queue, Time time, KafkaCruiseControl kafkaCruiseControl, List<String> list) {
        super(THREAD_NAME, true);
        this.kafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(kafkaCruiseControlConfig, ZK_BROKER_FAILURE_METRIC_GROUP, ZK_BROKER_FAILURE_METRIC_TYPE, option);
        this._failedBrokers = new HashMap();
        this._loadMonitor = loadMonitor;
        this._anomalies = queue;
        this._time = time;
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._allowCapacityEstimation = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        this._excludeRecentlyDemotedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG).booleanValue();
        this._excludeRecentlyRemovedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.BROKER_FAILURE_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue();
        this._selfHealingGoals = list;
        this.eventQueue = new ArrayBlockingQueue<>(10);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void startDetection() {
        start();
    }

    private void detectBrokerFailures(Set<Integer> set) {
        if (updateFailedBrokers(set)) {
            persistFailedBrokerList();
        }
        reportBrokerFailures();
    }

    private void detectBrokerFailures() {
        detectBrokerFailures(aliveBrokers());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void scheduleDetection() {
        if (this.eventQueue.offer(new BrokerChangedEvent())) {
            return;
        }
        LOG.warn("Broker Failed Event Queue is full.");
    }

    Map<Integer, Long> failedBrokers() {
        return new HashMap(this._failedBrokers);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void shutdownNow() {
        shutdown();
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(this.kafkaZkClient);
    }

    private void persistFailedBrokerList() {
        this.kafkaZkClient.setOrCreateFailedBrokers((List) this._failedBrokers.entrySet().stream().map(entry -> {
            return new FailedBroker((Integer) entry.getKey(), (Long) entry.getValue());
        }).collect(Collectors.toList()));
    }

    private void loadPersistedFailedBrokerList() {
        JavaConverters.asJavaCollection(this.kafkaZkClient.getFailedBrokers()).stream().forEach(failedBroker -> {
            this._failedBrokers.put(failedBroker.brokerId(), failedBroker.failedAt());
        });
    }

    private boolean updateFailedBrokers(Set<Integer> set) {
        Set<Integer> brokersWithReplicas = this._loadMonitor.brokersWithReplicas(60000);
        brokersWithReplicas.removeAll(set);
        LOG.debug("Alive brokers: {}, failed brokers: {}", set, brokersWithReplicas);
        boolean removeIf = this._failedBrokers.entrySet().removeIf(entry -> {
            return !brokersWithReplicas.contains(entry.getKey());
        });
        Iterator<Integer> it = brokersWithReplicas.iterator();
        while (it.hasNext()) {
            if (this._failedBrokers.putIfAbsent(it.next(), Long.valueOf(this._time.milliseconds())) == null) {
                removeIf = true;
            }
        }
        return removeIf;
    }

    private Set<Integer> aliveBrokers() {
        return (Set) JavaConverters.asJavaCollection(this.kafkaZkClient.getSortedBrokerList()).stream().map(obj -> {
            return (Integer) obj;
        }).collect(Collectors.toSet());
    }

    private String failedBrokerString() {
        StringBuilder sb = new StringBuilder();
        Iterator<Map.Entry<Integer, Long>> it = this._failedBrokers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Integer, Long> next = it.next();
            sb.append(next.getKey()).append("=").append(next.getValue());
            if (it.hasNext()) {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    private void parsePersistedFailedBrokers(String str) {
        if (str == null || str.isEmpty()) {
            return;
        }
        for (String str2 : str.split(",")) {
            String[] split = str2.split("=");
            if (split.length != 2) {
                throw new IllegalStateException("The persisted failed broker string cannot be parsed. The string is " + str);
            }
            this._failedBrokers.putIfAbsent(Integer.valueOf(Integer.parseInt(split[0])), Long.valueOf(Long.parseLong(split[1])));
        }
    }

    private void reportBrokerFailures() {
        if (this._failedBrokers.isEmpty()) {
            return;
        }
        this._anomalies.add(new BrokerFailures(this._kafkaCruiseControl, failedBrokers(), this._allowCapacityEstimation, this._excludeRecentlyDemotedBrokers, this._excludeRecentlyRemovedBrokers, this._selfHealingGoals));
    }

    private void initialize() {
        this.kafkaZkClient.registerBrokerChangeHandler(new BrokerFailureListener());
        loadPersistedFailedBrokerList();
        detectBrokerFailures();
        this.initialized = true;
    }

    public void doWork() {
        if (!this.initialized) {
            initialize();
            LOG.debug("Broker Failure Detector initialized.");
        }
        try {
            this.eventQueue.take();
            LOG.debug("Broker Failure detected.");
            this.eventQueue.clear();
            detectBrokerFailures();
        } catch (InterruptedException e) {
            LOG.info("Broker failure detector interrupted. Exiting the doWork loop.");
        }
    }
}
