package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal.class */
public abstract class CapacityGoal extends AbstractGoal {
    private static final Logger LOG = LoggerFactory.getLogger(CapacityGoal.class);

    /* renamed from: com.linkedin.kafka.cruisecontrol.analyzer.goals.CapacityGoal$1, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/CapacityGoal$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType = new int[ActionType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[ActionType.INTER_BROKER_REPLICA_SWAP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[ActionType.INTER_BROKER_REPLICA_MOVEMENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[ActionType.LEADERSHIP_MOVEMENT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public CapacityGoal() {
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CapacityGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    protected abstract Resource resource();

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ActionAcceptance actionAcceptance(BalancingAction balancingAction, ClusterModel clusterModel) {
        Replica replica = clusterModel.broker(balancingAction.sourceBrokerId().intValue()).replica(balancingAction.topicPartition());
        Broker broker = clusterModel.broker(balancingAction.destinationBrokerId().intValue());
        switch (AnonymousClass1.$SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[balancingAction.balancingAction().ordinal()]) {
            case 1:
                return isSwapAcceptableForCapacity(replica, broker.replica(balancingAction.destinationTopicPartition())) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS /* 2 */:
            case 3:
                return isMovementAcceptableForCapacity(replica, broker) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + balancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this.minMonitoredPartitionPercentage, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public abstract String name();

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction balancingAction) {
        return isMovementAcceptableForCapacity(clusterModel.broker(balancingAction.sourceBrokerId().intValue()).replica(balancingAction.topicPartition()), clusterModel.broker(balancingAction.destinationBrokerId().intValue()));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return clusterModel.brokers();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        double expectedUtilizationFor = clusterModel.load().expectedUtilizationFor(resource());
        double capacityFor = clusterModel.capacityFor(resource()) * this.balancingConstraint.capacityThreshold(resource());
        if (capacityFor < expectedUtilizationFor) {
            throw new OptimizationFailureException(String.format("[%s] Insufficient healthy cluster capacity for resource: %s existing cluster utilization %f allowed capacity %f (capacity threshold: %f).", name(), resource(), Double.valueOf(expectedUtilizationFor), Double.valueOf(capacityFor), Double.valueOf(this.balancingConstraint.capacityThreshold(resource()))));
        }
        clusterModel.trackSortedReplicas(sortName(), optimizationOptions.onlyMoveImmigrantReplicas() ? ReplicaSortFunctionFactory.selectImmigrants() : null, ReplicaSortFunctionFactory.deprioritizeOfflineReplicasThenImmigrants(), ReplicaSortFunctionFactory.sortByMetricGroupValue(resource().name()));
        clusterModel.trackSortedReplicas(sortNameByLeader(), optimizationOptions.onlyMoveImmigrantReplicas() ? ReplicaSortFunctionFactory.selectImmigrantLeaders() : ReplicaSortFunctionFactory.selectLeaders(), ReplicaSortFunctionFactory.deprioritizeImmigrants(), ReplicaSortFunctionFactory.sortByMetricGroupValue(resource().name()));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        try {
            ensureUtilizationUnderCapacity(clusterModel);
            GoalUtils.ensureNoOfflineReplicas(clusterModel, name());
            GoalUtils.ensureReplicasMoveOffBrokersWithBadDisks(clusterModel, name());
            finish();
        } finally {
            clusterModel.untrackSortedReplicas(sortName());
            clusterModel.untrackSortedReplicas(sortNameByLeader());
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this.finished = true;
    }

    private void ensureUtilizationUnderCapacity(ClusterModel clusterModel) throws OptimizationFailureException {
        Resource resource = resource();
        double capacityThreshold = this.balancingConstraint.capacityThreshold(resource);
        for (Broker broker : clusterModel.brokers()) {
            if (resource.isHostResource()) {
                double expectedUtilizationFor = broker.host().load().expectedUtilizationFor(resource);
                double capacityFor = broker.host().capacityFor(resource) * capacityThreshold;
                if (!broker.host().replicas().isEmpty() && expectedUtilizationFor > capacityFor) {
                    throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for host %s is %f which is above capacity limit %f.", name(), resource, broker.host().name(), Double.valueOf(expectedUtilizationFor), Double.valueOf(capacityFor)));
                }
            }
            if (resource.isBrokerResource()) {
                double expectedUtilizationFor2 = broker.load().expectedUtilizationFor(resource);
                double capacityFor2 = broker.capacityFor(resource) * capacityThreshold;
                if (!broker.replicas().isEmpty() && expectedUtilizationFor2 > capacityFor2) {
                    throw new OptimizationFailureException(String.format("Optimization for goal %s failed because %s utilization for broker %d is %f which is above capacity limit %f.", name(), resource, Integer.valueOf(broker.id()), Double.valueOf(expectedUtilizationFor2), Double.valueOf(capacityFor2)));
                }
            }
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        LOG.debug("balancing broker {}, optimized goals = {}", broker, set);
        Resource resource = resource();
        double capacityThreshold = this.balancingConstraint.capacityThreshold(resource);
        double capacityFor = broker.capacityFor(resource) * capacityThreshold;
        double capacityFor2 = broker.host().capacityFor(resource) * capacityThreshold;
        boolean isUtilizationOverLimit = isUtilizationOverLimit(broker, resource, capacityFor, capacityFor2);
        if (isUtilizationOverLimit || !broker.currentOfflineReplicas().isEmpty()) {
            Set<String> excludedTopics = optimizationOptions.excludedTopics();
            if (resource == Resource.NW_OUT || resource == Resource.CPU) {
                for (Replica replica : broker.trackedSortedReplicas(sortNameByLeader()).reverselySortedReplicas()) {
                    if (!shouldExclude(replica, excludedTopics)) {
                        List<Replica> onlineFollowers = clusterModel.partition(replica.topicPartition()).onlineFollowers();
                        GoalUtils.sortReplicasInAscendingOrderByBrokerResourceUtilization(onlineFollowers, resource);
                        List list = (List) onlineFollowers.stream().map((v0) -> {
                            return v0.broker();
                        }).collect(Collectors.toList());
                        if (maybeApplyBalancingAction(clusterModel, replica, list, ActionType.LEADERSHIP_MOVEMENT, set, optimizationOptions) == null) {
                            LOG.debug("Failed to move leader replica {} to any other brokers in {}", replica, list);
                        }
                        isUtilizationOverLimit = isUtilizationOverLimit(broker, resource, capacityFor, capacityFor2);
                        if (!isUtilizationOverLimit) {
                            break;
                        }
                    }
                }
            }
            if (isUtilizationOverLimit || !broker.currentOfflineReplicas().isEmpty()) {
                List<Broker> sortedAliveBrokersUnderThreshold = clusterModel.sortedAliveBrokersUnderThreshold(resource, capacityThreshold);
                for (Replica replica2 : broker.trackedSortedReplicas(sortName()).reverselySortedReplicas()) {
                    if (!shouldExclude(replica2, excludedTopics)) {
                        if (maybeApplyBalancingAction(clusterModel, replica2, sortedAliveBrokersUnderThreshold, ActionType.INTER_BROKER_REPLICA_MOVEMENT, set, optimizationOptions) == null) {
                            LOG.debug("Failed to move replica {} to any broker in {}", replica2, sortedAliveBrokersUnderThreshold);
                        }
                        isUtilizationOverLimit = isUtilizationOverLimit(broker, resource, capacityFor, capacityFor2);
                        if (!isUtilizationOverLimit && broker.currentOfflineReplicas().isEmpty()) {
                            break;
                        }
                    }
                }
            }
            postSanityCheck(isUtilizationOverLimit, broker, capacityFor, capacityFor2);
        }
    }

    private void postSanityCheck(boolean z, Broker broker, double d, double d2) throws OptimizationFailureException {
        if (!z) {
            if (!broker.currentOfflineReplicas().isEmpty()) {
                throw new OptimizationFailureException("Failed to remove offline replicas from broker " + broker.id() + ".");
            }
        } else {
            Resource resource = resource();
            if (!resource.isHostResource()) {
                throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via broker utilization of %f with broker %d for resource %s.", name(), Double.valueOf(d), Double.valueOf(broker.load().expectedUtilizationFor(resource)), Integer.valueOf(broker.id()), resource));
            }
            throw new OptimizationFailureException(String.format("[%s] Violated capacity limit of %f via host utilization of %f with hostname %s for resource %s.", name(), Double.valueOf(d2), Double.valueOf(broker.host().load().expectedUtilizationFor(resource)), broker.host().name(), resource));
        }
    }

    private String sortName() {
        return name() + "-" + resource().name() + "-ALL";
    }

    private String sortNameByLeader() {
        return name() + "-" + resource().name() + "-LEADER";
    }

    private boolean isUtilizationOverLimit(Broker broker, Resource resource, double d, double d2) {
        if (broker.host().replicas().isEmpty() || !resource.isHostResource() || broker.host().load().expectedUtilizationFor(resource) <= d2) {
            return !broker.replicas().isEmpty() && resource.isBrokerResource() && broker.load().expectedUtilizationFor(resource) > d;
        }
        return true;
    }

    private boolean isMovementAcceptableForCapacity(Replica replica, Broker broker) {
        return isUtilizationUnderLimitAfterAddingLoad(broker, replica.load().expectedUtilizationFor(resource()));
    }

    private boolean isSwapAcceptableForCapacity(Replica replica, Replica replica2) {
        double expectedUtilizationFor = replica2.load().expectedUtilizationFor(resource()) - replica.load().expectedUtilizationFor(resource());
        return expectedUtilizationFor > 0.0d ? isUtilizationUnderLimitAfterAddingLoad(replica.broker(), expectedUtilizationFor) : isUtilizationUnderLimitAfterAddingLoad(replica2.broker(), -expectedUtilizationFor);
    }

    private boolean isUtilizationUnderLimitAfterAddingLoad(Broker broker, double d) {
        Resource resource = resource();
        double capacityThreshold = this.balancingConstraint.capacityThreshold(resource);
        if (resource.isHostResource()) {
            if (broker.host().load().expectedUtilizationFor(resource) + d >= broker.host().capacityFor(resource) * capacityThreshold) {
                return false;
            }
        }
        if (resource.isBrokerResource()) {
            return broker.load().expectedUtilizationFor(resource) + d < broker.capacityFor(resource) * capacityThreshold;
        }
        return true;
    }
}
