package com.linkedin.kafka.cruisecontrol.detector;

import com.linkedin.cruisecontrol.detector.Anomaly;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionProposal;
import com.linkedin.kafka.cruisecontrol.executor.ExecutorState;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import com.linkedin.kafka.cruisecontrol.monitor.ModelGeneration;
import com.linkedin.kafka.cruisecontrol.servlet.response.CruiseControlState;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/detector/GoalViolationDetector.class */
public class GoalViolationDetector implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(GoalViolationDetector.class);
    private final KafkaCruiseControl _kafkaCruiseControl;
    private final LoadMonitor _loadMonitor;
    private final List<Goal> _detectionGoals;
    private final Time _time;
    private final Queue<Anomaly> _anomalies;
    private ModelGeneration _lastCheckedModelGeneration;
    private final Pattern _excludedTopics;
    private final boolean _allowCapacityEstimation;
    private final boolean _excludeRecentlyDemotedBrokers;
    private final boolean _excludeRecentlyRemovedBrokers;
    private final List<String> _selfHealingGoals;
    private final Map<String, Double> _balancednessCostByGoal;
    private volatile double _balancednessScore = 100.0d;

    public GoalViolationDetector(KafkaCruiseControlConfig kafkaCruiseControlConfig, LoadMonitor loadMonitor, Queue<Anomaly> queue, Time time, KafkaCruiseControl kafkaCruiseControl, List<String> list) {
        this._loadMonitor = loadMonitor;
        this._detectionGoals = kafkaCruiseControlConfig.getConfiguredInstances(KafkaCruiseControlConfig.ANOMALY_DETECTION_GOALS_CONFIG, Goal.class);
        this._anomalies = queue;
        this._time = time;
        this._excludedTopics = Pattern.compile(kafkaCruiseControlConfig.getString(KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG));
        this._allowCapacityEstimation = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.ANOMALY_DETECTION_ALLOW_CAPACITY_ESTIMATION_CONFIG).booleanValue();
        this._excludeRecentlyDemotedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_DEMOTED_BROKERS_CONFIG).booleanValue();
        this._excludeRecentlyRemovedBrokers = kafkaCruiseControlConfig.getBoolean(KafkaCruiseControlConfig.GOAL_VIOLATION_EXCLUDE_RECENTLY_REMOVED_BROKERS_CONFIG).booleanValue();
        this._kafkaCruiseControl = kafkaCruiseControl;
        this._selfHealingGoals = list;
        this._balancednessCostByGoal = KafkaCruiseControlUtils.balancednessCostByGoal(this._detectionGoals, kafkaCruiseControlConfig.getDouble(KafkaCruiseControlConfig.GOAL_BALANCEDNESS_PRIORITY_WEIGHT_CONFIG).doubleValue(), kafkaCruiseControlConfig.getDouble(KafkaCruiseControlConfig.GOAL_BALANCEDNESS_STRICTNESS_WEIGHT_CONFIG).doubleValue());
    }

    public double balancednessScore() {
        return this._balancednessScore;
    }

    private boolean shouldSkipGoalViolationDetection() {
        if (this._loadMonitor.clusterModelGeneration().equals(this._lastCheckedModelGeneration)) {
            if (!LOG.isDebugEnabled()) {
                return true;
            }
            LOG.debug("Skipping goal violation detection because the model generation hasn't changed. Current model generation {}", this._loadMonitor.clusterModelGeneration());
            return true;
        }
        Set<Integer> brokersWithOfflineReplicas = this._loadMonitor.brokersWithOfflineReplicas(60000);
        if (brokersWithOfflineReplicas.isEmpty()) {
            return AnomalyDetectorUtils.shouldSkipAnomalyDetection(this._loadMonitor, this._kafkaCruiseControl);
        }
        LOG.info("Skipping goal violation detection because there are dead brokers in the cluster, flawed brokers: {}", brokersWithOfflineReplicas);
        setBalancednessWithOfflineReplicas();
        return true;
    }

    @Override // java.lang.Runnable
    public void run() {
        if (shouldSkipGoalViolationDetection()) {
            return;
        }
        LoadMonitor.AutoCloseableSemaphore autoCloseableSemaphore = null;
        try {
            try {
                try {
                    try {
                        GoalViolations goalViolations = new GoalViolations(this._kafkaCruiseControl, this._allowCapacityEstimation, this._excludeRecentlyDemotedBrokers, this._excludeRecentlyRemovedBrokers, this._selfHealingGoals);
                        long milliseconds = this._time.milliseconds();
                        boolean z = true;
                        ClusterModel clusterModel = null;
                        ExecutorState executorState = (this._excludeRecentlyDemotedBrokers || this._excludeRecentlyRemovedBrokers) ? this._kafkaCruiseControl.state(new OperationProgress(), Collections.singleton(CruiseControlState.SubState.EXECUTOR)).executorState() : null;
                        Set<Integer> recentlyDemotedBrokers = this._excludeRecentlyDemotedBrokers ? executorState.recentlyDemotedBrokers() : Collections.emptySet();
                        Set<Integer> recentlyRemovedBrokers = this._excludeRecentlyRemovedBrokers ? executorState.recentlyRemovedBrokers() : Collections.emptySet();
                        ArrayList arrayList = new ArrayList();
                        for (Goal goal : this._detectionGoals) {
                            if (this._loadMonitor.meetCompletenessRequirements(goal.clusterModelCompletenessRequirements())) {
                                LOG.debug("Detecting if {} is violated.", goal.name());
                                if (z) {
                                    if (autoCloseableSemaphore != null) {
                                        autoCloseableSemaphore.close();
                                    }
                                    autoCloseableSemaphore = this._loadMonitor.acquireForModelGeneration(new OperationProgress());
                                    clusterModel = this._loadMonitor.clusterModel(milliseconds, goal.clusterModelCompletenessRequirements(), new OperationProgress());
                                    if (skipDueToOfflineReplicas(clusterModel)) {
                                        if (autoCloseableSemaphore != null) {
                                            try {
                                                autoCloseableSemaphore.close();
                                            } catch (Exception e) {
                                                LOG.error("Received exception when closing auto closable semaphore", e);
                                            }
                                        }
                                        LOG.debug("Goal violation detection finished.");
                                        return;
                                    }
                                    KafkaCruiseControl.sanityCheckCapacityEstimation(this._allowCapacityEstimation, clusterModel.capacityEstimationInfoByBrokerId());
                                    this._lastCheckedModelGeneration = clusterModel.generation();
                                }
                                z = optimizeForGoal(clusterModel, goal, goalViolations, recentlyDemotedBrokers, recentlyRemovedBrokers);
                            } else {
                                arrayList.add(goal);
                            }
                        }
                        if (!arrayList.isEmpty()) {
                            LOG.warn("Skipped goal violation detection for goals {} because load completeness requirement were not met.", arrayList);
                        }
                        Map<Boolean, List<String>> violatedGoalsByFixability = goalViolations.violatedGoalsByFixability();
                        if (!violatedGoalsByFixability.isEmpty()) {
                            this._anomalies.add(goalViolations);
                        }
                        refreshBalancednessScore(violatedGoalsByFixability);
                        if (autoCloseableSemaphore != null) {
                            try {
                                autoCloseableSemaphore.close();
                            } catch (Exception e2) {
                                LOG.error("Received exception when closing auto closable semaphore", e2);
                            }
                        }
                        LOG.debug("Goal violation detection finished.");
                    } catch (Throwable th) {
                        if (autoCloseableSemaphore != null) {
                            try {
                                autoCloseableSemaphore.close();
                            } catch (Exception e3) {
                                LOG.error("Received exception when closing auto closable semaphore", e3);
                            }
                        }
                        LOG.debug("Goal violation detection finished.");
                        throw th;
                    }
                } catch (KafkaCruiseControlException e4) {
                    LOG.warn("Goal violation detector received exception", e4);
                    if (autoCloseableSemaphore != null) {
                        try {
                            autoCloseableSemaphore.close();
                        } catch (Exception e5) {
                            LOG.error("Received exception when closing auto closable semaphore", e5);
                        }
                    }
                    LOG.debug("Goal violation detection finished.");
                }
            } catch (NotEnoughValidWindowsException e6) {
                LOG.debug("Skipping goal violation detection because there are not enough valid windows.", e6);
                if (autoCloseableSemaphore != null) {
                    try {
                        autoCloseableSemaphore.close();
                    } catch (Exception e7) {
                        LOG.error("Received exception when closing auto closable semaphore", e7);
                    }
                }
                LOG.debug("Goal violation detection finished.");
            }
        } catch (Exception e8) {
            LOG.error("Unexpected exception", e8);
            if (autoCloseableSemaphore != null) {
                try {
                    autoCloseableSemaphore.close();
                } catch (Exception e9) {
                    LOG.error("Received exception when closing auto closable semaphore", e9);
                }
            }
            LOG.debug("Goal violation detection finished.");
        }
    }

    private boolean skipDueToOfflineReplicas(ClusterModel clusterModel) {
        if (!clusterModel.deadBrokers().isEmpty()) {
            LOG.info("Skipping goal violation detection due to dead brokers {}, which are reported by broker failure detector, and fixed if its self healing configuration is enabled.", clusterModel.deadBrokers());
            setBalancednessWithOfflineReplicas();
            return true;
        }
        if (clusterModel.brokersWithBadDisks().isEmpty()) {
            return false;
        }
        LOG.info("Skipping goal violation detection due to brokers with bad disks {}, which are reported by disk failure detector, and fixed if its self healing configuration is enabled.", clusterModel.brokersWithBadDisks());
        setBalancednessWithOfflineReplicas();
        return true;
    }

    private void setBalancednessWithOfflineReplicas() {
        this._balancednessScore = 0.0d;
    }

    private void refreshBalancednessScore(Map<Boolean, List<String>> map) {
        double d = 100.0d;
        Iterator<List<String>> it = map.values().iterator();
        while (it.hasNext()) {
            Iterator<String> it2 = it.next().iterator();
            while (it2.hasNext()) {
                d -= this._balancednessCostByGoal.get(it2.next()).doubleValue();
            }
        }
        this._balancednessScore = d;
    }

    private Set<String> excludedTopics(ClusterModel clusterModel) {
        return (Set) clusterModel.topics().stream().filter(str -> {
            return this._excludedTopics.matcher(str).matches();
        }).collect(Collectors.toSet());
    }

    private boolean optimizeForGoal(ClusterModel clusterModel, Goal goal, GoalViolations goalViolations, Set<Integer> set, Set<Integer> set2) throws KafkaCruiseControlException {
        if (clusterModel.topics().isEmpty()) {
            LOG.info("Skipping goal violation detection because the cluster model does not have any topic.");
            return false;
        }
        Map<TopicPartition, List<ReplicaPlacementInfo>> replicaDistribution = clusterModel.getReplicaDistribution();
        Map<TopicPartition, ReplicaPlacementInfo> leaderDistribution = clusterModel.getLeaderDistribution();
        Map<TopicPartition, List<ReplicaPlacementInfo>> observerDistribution = clusterModel.getObserverDistribution();
        boolean canChangeReplicationFactor = goal.canChangeReplicationFactor();
        try {
            goal.optimize(clusterModel, new HashSet(), new OptimizationOptions(excludedTopics(clusterModel), set, set2, true));
            Set<ExecutionProposal> diff = AnalyzerUtils.getDiff(replicaDistribution, leaderDistribution, observerDistribution, clusterModel, canChangeReplicationFactor);
            LOG.trace("{} generated {} proposals", goal.name(), Integer.valueOf(diff.size()));
            if (diff.isEmpty()) {
                return false;
            }
            goalViolations.addViolation(goal.name(), true);
            return true;
        } catch (OptimizationFailureException e) {
            goalViolations.addViolation(goal.name(), false);
            return true;
        }
    }
}
