package com.linkedin.kafka.cruisecontrol.analyzer.goals;

import com.linkedin.kafka.cruisecontrol.analyzer.ActionAcceptance;
import com.linkedin.kafka.cruisecontrol.analyzer.ActionType;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingAction;
import com.linkedin.kafka.cruisecontrol.analyzer.BalancingConstraint;
import com.linkedin.kafka.cruisecontrol.analyzer.OptimizationOptions;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.OptimizationFailureException;
import com.linkedin.kafka.cruisecontrol.model.Broker;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.Disk;
import com.linkedin.kafka.cruisecontrol.model.Replica;
import com.linkedin.kafka.cruisecontrol.model.ReplicaSortFunctionFactory;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/IntraBrokerDiskCapacityGoal.class */
public class IntraBrokerDiskCapacityGoal extends AbstractGoal {
    private static final int MIN_NUM_VALID_WINDOWS = 1;
    private static final Logger LOG = LoggerFactory.getLogger(IntraBrokerDiskCapacityGoal.class);
    private static final Resource RESOURCE = Resource.DISK;

    /* renamed from: com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal$2, reason: invalid class name */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/analyzer/goals/IntraBrokerDiskCapacityGoal$2.class */
    static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType = new int[ActionType.values().length];

        static {
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[ActionType.INTRA_BROKER_REPLICA_SWAP.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[ActionType.INTRA_BROKER_REPLICA_MOVEMENT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[ActionType.LEADERSHIP_MOVEMENT.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public IntraBrokerDiskCapacityGoal() {
    }

    IntraBrokerDiskCapacityGoal(BalancingConstraint balancingConstraint) {
        this.balancingConstraint = balancingConstraint;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public boolean isHardGoal() {
        return true;
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void initGoalState(ClusterModel clusterModel, OptimizationOptions optimizationOptions) throws OptimizationFailureException {
        for (Broker broker : clusterModel.aliveBrokers()) {
            double expectedUtilizationFor = broker.load().expectedUtilizationFor(RESOURCE);
            double capacityFor = broker.capacityFor(RESOURCE) * this.balancingConstraint.capacityThreshold(RESOURCE);
            if (capacityFor < expectedUtilizationFor) {
                throw new OptimizationFailureException("Insufficient disk capacity at broker " + broker.id() + ", existing broker utilization " + expectedUtilizationFor + " exceeds allowed capacity " + capacityFor);
            }
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected SortedSet<Broker> brokersToBalance(ClusterModel clusterModel) {
        return new TreeSet(clusterModel.aliveBrokers());
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ActionAcceptance actionAcceptance(BalancingAction balancingAction, ClusterModel clusterModel) {
        if (balancingAction.sourceBrokerLogdir() == null || balancingAction.destinationBrokerLogdir() == null) {
            throw new IllegalArgumentException(getClass().getSimpleName() + " does not support balancing action not specifying logdir.");
        }
        Replica replica = clusterModel.broker(balancingAction.sourceBrokerId().intValue()).replica(balancingAction.topicPartition());
        Disk disk = clusterModel.broker(balancingAction.destinationBrokerId().intValue()).disk(balancingAction.destinationBrokerLogdir());
        switch (AnonymousClass2.$SwitchMap$com$linkedin$kafka$cruisecontrol$analyzer$ActionType[balancingAction.balancingAction().ordinal()]) {
            case 1:
                return isSwapAcceptableForCapacity(replica, clusterModel.broker(balancingAction.destinationBrokerId().intValue()).replica(balancingAction.destinationTopicPartition())) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case KafkaCruiseControlConfig.DEFAULT_NUM_SAMPLE_LOADING_THREADS /* 2 */:
                return isMovementAcceptableForCapacity(replica, disk) ? ActionAcceptance.ACCEPT : ActionAcceptance.REPLICA_REJECT;
            case 3:
                return ActionAcceptance.ACCEPT;
            default:
                throw new IllegalArgumentException("Unsupported balancing action " + balancingAction.balancingAction() + " is provided.");
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected boolean selfSatisfied(ClusterModel clusterModel, BalancingAction balancingAction) {
        Replica replica = clusterModel.broker(balancingAction.sourceBrokerId().intValue()).replica(balancingAction.topicPartition());
        return replica.load().expectedUtilizationFor(RESOURCE) > 0.0d && isMovementAcceptableForCapacity(replica, clusterModel.broker(balancingAction.destinationBrokerId().intValue()).disk(balancingAction.destinationBrokerLogdir()));
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void rebalanceForBroker(Broker broker, ClusterModel clusterModel, Set<Goal> set, OptimizationOptions optimizationOptions) {
        LOG.debug("balancing broker {}, optimized goals = {}.", broker, set);
        List<Disk> list = (List) broker.disks().stream().filter((v0) -> {
            return v0.isAlive();
        }).filter(this::isUtilizationOverLimit).collect(Collectors.toList());
        if (list.isEmpty()) {
            return;
        }
        Set<String> excludedTopics = optimizationOptions.excludedTopics();
        ArrayList arrayList = new ArrayList(broker.disks());
        arrayList.removeAll(list);
        arrayList.sort(new Comparator<Disk>() { // from class: com.linkedin.kafka.cruisecontrol.analyzer.goals.IntraBrokerDiskCapacityGoal.1
            @Override // java.util.Comparator
            public int compare(Disk disk, Disk disk2) {
                return Double.valueOf(((disk2.capacity() * IntraBrokerDiskCapacityGoal.this.balancingConstraint.capacityThreshold(IntraBrokerDiskCapacityGoal.RESOURCE)) - disk2.utilization()) - ((disk.capacity() * IntraBrokerDiskCapacityGoal.this.balancingConstraint.capacityThreshold(IntraBrokerDiskCapacityGoal.RESOURCE)) - disk.utilization())).intValue();
            }
        });
        for (Disk disk : list) {
            disk.trackSortedReplicas(name(), ReplicaSortFunctionFactory.selectOnlineReplicas(), ReplicaSortFunctionFactory.deprioritizeDiskImmigrants(), ReplicaSortFunctionFactory.sortByMetricGroupValue(RESOURCE.name()));
            for (Replica replica : disk.trackedSortedReplicas(name()).reverselySortedReplicas()) {
                if (!shouldExclude(replica, excludedTopics)) {
                    if (maybeMoveReplicaBetweenDisks(clusterModel, replica, arrayList, set) == null) {
                        LOG.debug("Failed to move replica {} to any disk {} in broker {}", new Object[]{replica, arrayList, replica.broker()});
                    }
                    if (!isUtilizationOverLimit(disk)) {
                        break;
                    }
                }
            }
            disk.untrackSortedReplicas(name());
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal
    protected void updateGoalState(ClusterModel clusterModel, Set<String> set) throws OptimizationFailureException {
        for (Broker broker : brokersToBalance(clusterModel)) {
            for (Disk disk : broker.disks()) {
                if (disk.isAlive() && isUtilizationOverLimit(disk)) {
                    throw new OptimizationFailureException(String.format("Optimization for goal %s failed because utilization for disk %s on broker %d is still above capacity limit.", name(), disk, Integer.valueOf(broker.id())));
                }
            }
        }
        finish();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public ModelCompletenessRequirements clusterModelCompletenessRequirements() {
        return new ModelCompletenessRequirements(1, this.minMonitoredPartitionPercentage, true);
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.AbstractGoal, com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public String name() {
        return getClass().getSimpleName();
    }

    @Override // com.linkedin.kafka.cruisecontrol.analyzer.goals.Goal
    public void finish() {
        this.finished = true;
    }

    private boolean isUtilizationOverLimit(Disk disk) {
        return disk.utilization() > disk.capacity() * this.balancingConstraint.capacityThreshold(RESOURCE);
    }

    private boolean isMovementAcceptableForCapacity(Replica replica, Disk disk) {
        return isUtilizationUnderLimitAfterAddingLoad(disk, replica.load().expectedUtilizationFor(RESOURCE));
    }

    private boolean isSwapAcceptableForCapacity(Replica replica, Replica replica2) {
        double expectedUtilizationFor = replica2.load().expectedUtilizationFor(RESOURCE) - replica.load().expectedUtilizationFor(RESOURCE);
        return expectedUtilizationFor > 0.0d ? isUtilizationUnderLimitAfterAddingLoad(replica.disk(), expectedUtilizationFor) : isUtilizationUnderLimitAfterAddingLoad(replica2.disk(), -expectedUtilizationFor);
    }

    private boolean isUtilizationUnderLimitAfterAddingLoad(Disk disk, double d) {
        return disk.utilization() + d < disk.capacity() * this.balancingConstraint.capacityThreshold(RESOURCE);
    }
}
