package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.AnalyzerUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.common.Statistic;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ClusterModelStats;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/LeaderBytesInDistributionGoal.class */
public class LeaderBytesInDistributionGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(LeaderBytesInDistributionGoal.class);
    private double _meanLeaderBytesIn;
    private Set<Integer> _overLimitBrokerIds;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/LeaderBytesInDistributionGoal$LeaderBytesInDistributionGoalStatsComparator.class */
    private class LeaderBytesInDistributionGoalStatsComparator implements Goal.ClusterModelStatsComparator {
        private String _reasonForLastNegativeResult;

        private LeaderBytesInDistributionGoalStatsComparator() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal.ClusterModelStatsComparator, java.util.Comparator
        public int compare(ClusterModelStats clusterModelStats, ClusterModelStats clusterModelStats2) {
            if (clusterModelStats.resourceUtilizationStats().get(Statistic.MAX).get(Resource.NW_IN).doubleValue() <= clusterModelStats.resourceUtilizationStats().get(Statistic.AVG).get(Resource.NW_IN).doubleValue() * LeaderBytesInDistributionGoal.this._balancingConstraint.resourceBalancePercentage(Resource.NW_IN)) {
                return 1;
            }
            double doubleValue = clusterModelStats.resourceUtilizationStats().get(Statistic.ST_DEV).get(Resource.NW_IN).doubleValue();
            double doubleValue2 = clusterModelStats2.resourceUtilizationStats().get(Statistic.ST_DEV).get(Resource.NW_IN).doubleValue();
            int compare = AnalyzerUtils.compare(Math.sqrt(doubleValue2), Math.sqrt(doubleValue), Resource.NW_IN);
            if (compare < 0) {
                this._reasonForLastNegativeResult = String.format("Violated leader bytes in balancing. preVariance: %.3f postVariance: %.3f.", Double.valueOf(doubleValue2), Double.valueOf(doubleValue));
            }
            return compare;
        }

        @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal.ClusterModelStatsComparator
        public String explainLastComparison() {
            return this._reasonForLastNegativeResult;
        }
    }

    public LeaderBytesInDistributionGoal() {
    }

    LeaderBytesInDistributionGoal(BalancingConstraint balancingConstraint) {
        this._balancingConstraint = balancingConstraint;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ActionAcceptance actionAcceptance(BalancingAction balancingAction, ClusterModel clusterModel) {
        double expectedUtilizationFor;
        Replica replica = clusterModel.broker(balancingAction.sourceBrokerId().intValue()).replica(balancingAction.topicPartition());
        Broker broker = clusterModel.broker(balancingAction.destinationBrokerId().intValue());
        initMeanLeaderBytesIn(clusterModel);
        if (!replica.isLeader()) {
            switch (balancingAction.balancingAction()) {
                case INTER_BROKER_REPLICA_SWAP:
                    if (!broker.replica(balancingAction.destinationTopicPartition()).isLeader()) {
                        return ActionAcceptance.ACCEPT;
                    }
                    break;
                case INTER_BROKER_REPLICA_MOVEMENT:
                    return ActionAcceptance.ACCEPT;
                case LEADERSHIP_MOVEMENT:
                    throw new IllegalStateException("Attempt to move leadership from the follower.");
                default:
                    throw new IllegalArgumentException("Unsupported balancing action " + balancingAction.balancingAction() + " is provided.");
            }
        }
        double expectedUtilizationFor2 = replica.load().expectedUtilizationFor(Resource.NW_IN);
        switch (balancingAction.balancingAction()) {
            case INTER_BROKER_REPLICA_SWAP:
                double expectedUtilizationFor3 = broker.replica(balancingAction.destinationTopicPartition()).load().expectedUtilizationFor(Resource.NW_IN);
                expectedUtilizationFor = (broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + expectedUtilizationFor2) - expectedUtilizationFor3;
                Broker broker2 = clusterModel.broker(balancingAction.sourceBrokerId().intValue());
                if ((broker2.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + expectedUtilizationFor3) - expectedUtilizationFor2 > balanceThreshold(clusterModel, broker2.id())) {
                    return ActionAcceptance.REPLICA_REJECT;
                }
                break;
            case INTER_BROKER_REPLICA_MOVEMENT:
            case LEADERSHIP_MOVEMENT:
                expectedUtilizationFor = broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) + expectedUtilizationFor2;
                break;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + balancingAction.balancingAction() + " is provided.");
        }
        return expectedUtilizationFor <= balanceThreshold(clusterModel, broker.id()) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public Goal.ClusterModelStatsComparator clusterModelStatsComparator() {
        return new LeaderBytesInDistributionGoalStatsComparator();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(Math.max(1, this._numWindows / 2), this._minMonitoredPartitionPercentage, false);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return LeaderBytesInDistributionGoal.class.getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return false;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        SortedSet<Broker> brokers = clusterModel.brokers();
        Iterator<Broker> it = brokers.iterator();
        while (it.hasNext()) {
            Broker next = it.next();
            if (next.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) <= balanceThreshold(clusterModel, next.id())) {
                it.remove();
            }
        }
        return brokers;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction balancingAction) {
        if (balancingAction.balancingAction() != ActionType.LEADERSHIP_MOVEMENT) {
            throw new IllegalStateException("Found balancing action " + balancingAction.balancingAction() + " but expected leadership movement.");
        }
        return actionAcceptance(balancingAction, clusterModel) == ActionAcceptance.ACCEPT;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) {
        this._meanLeaderBytesIn = 0.0d;
        this._overLimitBrokerIds = new HashSet();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this._finished = true;
        this._overLimitBrokerIds.clear();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) {
        if (!this._overLimitBrokerIds.isEmpty()) {
            LOG.warn("There were still {} brokers over the limit.", Integer.valueOf(this._overLimitBrokerIds.size()));
            this._succeeded = false;
        }
        finish();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        double balanceThreshold = balanceThreshold(clusterModel, broker.id());
        if (broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) < balanceThreshold) {
            return;
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        boolean z = true;
        Iterator it = ((List) broker.replicas().stream().filter((v0) -> {
            return v0.isLeader();
        }).filter(replica -> {
            return !shouldExclude(replica, excludedTopics);
        }).sorted((replica2, replica3) -> {
            return Double.compare(replica3.load().expectedUtilizationFor(Resource.NW_IN), replica2.load().expectedUtilizationFor(Resource.NW_IN));
        }).collect(Collectors.toList())).iterator();
        while (z && it.hasNext()) {
            Replica replica4 = (Replica) it.next();
            maybeApplyBalancingAction(clusterModel, replica4, (List) clusterModel.partition(replica4.topicPartition()).onlineFollowers().stream().map((v0) -> {
                return v0.broker();
            }).sorted(Comparator.comparingDouble(broker2 -> {
                return broker2.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN);
            })).collect(Collectors.toList()), ActionType.LEADERSHIP_MOVEMENT, set, optimizationOptions);
            z = broker.leadershipLoadForNwResources().expectedUtilizationFor(Resource.NW_IN) > balanceThreshold;
        }
        if (z) {
            this._overLimitBrokerIds.add(Integer.valueOf(broker.id()));
        }
    }

    private void initMeanLeaderBytesIn(ClusterModel clusterModel) {
        if (this._meanLeaderBytesIn == 0.0d) {
            this._meanLeaderBytesIn = meanLeaderResourceUtilization(clusterModel.brokers(), Resource.NW_IN);
        }
    }

    private static double meanLeaderResourceUtilization(Collection<Broker> collection, Resource resource) {
        double d = 0.0d;
        int i = 0;
        for (Broker broker : collection) {
            if (broker.isAlive()) {
                d += broker.leadershipLoadForNwResources().expectedUtilizationFor(resource);
                i++;
            }
        }
        return d / i;
    }

    private double balanceThreshold(ClusterModel clusterModel, int i) {
        initMeanLeaderBytesIn(clusterModel);
        return Math.max(this._meanLeaderBytesIn * this._balancingConstraint.resourceBalancePercentage(Resource.NW_IN), this._balancingConstraint.lowUtilizationThreshold(Resource.NW_IN) * clusterModel.broker(i).capacityFor(Resource.NW_IN));
    }
}
