package io.confluent.databalancer;

import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalCallback;
import com.yammer.metrics.core.MetricName;
import io.confluent.databalancer.EngineInitializationContext;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalancerStatusStateMachine;
import io.confluent.databalancer.operation.BalancerStatusTracker;
import io.confluent.databalancer.operation.BrokerAdditionStateMachine;
import io.confluent.databalancer.operation.BrokerAdditionStateManager;
import io.confluent.databalancer.operation.BrokerRemovalCancellationMode;
import io.confluent.databalancer.operation.BrokerRemovalCancellationProposal;
import io.confluent.databalancer.operation.BrokerRemovalExclusionCancellationData;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.common.AliveBrokersSnapshot;
import kafka.common.BalancerStatusDescriptionInternal;
import kafka.common.BrokerAdditionDescriptionInternal;
import kafka.common.BrokerRemovalDescriptionInternal;
import kafka.common.BrokerRemovalRequest;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import kafka.controller.DataBalanceManager;
import kafka.metrics.KafkaYammerMetrics;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.ExclusionOp;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.BalancerBrokerExcludedForReplicaPlacementException;
import org.apache.kafka.common.errors.BalancerJbodEnabledMisconfigurationException;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalCanceledException;
import org.apache.kafka.common.errors.InvalidBrokerRemovalException;
import org.apache.kafka.common.protocol.BalancerOperationOverriddenException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

/* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager.class */
public class KafkaDataBalanceManager implements DataBalanceManager {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaDataBalanceManager.class);
    public static final String ACTIVE_BALANCER_COUNT_METRIC_NAME = "ActiveBalancerCount";
    public static final String BROKER_REMOVAL_STATE_METRIC_NAME = "BrokerRemovalOperationState";
    public static final String BROKER_ADDITION_STATE_METRIC_NAME = "BrokerAdditionOperationState";
    public static final String BALANCER_STATE_METRIC_NAME = "BalancerState";
    private KafkaConfig kafkaConfig;
    private DataBalanceEngine balanceEngine;
    BalancerStatusTracker balancerStatusTracker;
    private final DataBalanceEngineFactory dbeFactory;
    private final DataBalancerMetricsRegistry dataBalancerMetricsRegistry;
    private final long taskHistoryRetentionPeriodMs;
    private Time time;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager$BrokerRemovalMetricRegistry.class */
    public class BrokerRemovalMetricRegistry {
        BrokerRemovalMetricRegistry() {
        }

        public AtomicReference<String> registerBrokerRemovalMetric(Set<Integer> set) {
            AtomicReference<String> atomicReference = new AtomicReference<>("NOT_STARTED");
            for (Integer num : set) {
                DataBalancerMetricsRegistry dataBalancerMetricsRegistry = KafkaDataBalanceManager.this.dataBalancerMetricsRegistry;
                atomicReference.getClass();
                dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, KafkaDataBalanceManager.BROKER_REMOVAL_STATE_METRIC_NAME, atomicReference::get, true, KafkaDataBalanceManager.this.brokerIdMetricTag(num.intValue()));
            }
            return atomicReference;
        }
    }

    /* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager$DataBalanceEngineFactory.class */
    static class DataBalanceEngineFactory {
        private final DataBalanceEngine activeDataBalanceEngine;
        private final DataBalanceEngine inactiveDataBalanceEngine;

        DataBalanceEngineFactory(DataBalancerMetricsRegistry dataBalancerMetricsRegistry, KafkaConfig kafkaConfig) {
            this(new ConfluentDataBalanceEngine(dataBalancerMetricsRegistry, kafkaConfig), new NoOpDataBalanceEngine());
        }

        DataBalanceEngineFactory(DataBalanceEngine dataBalanceEngine, DataBalanceEngine dataBalanceEngine2) {
            this.activeDataBalanceEngine = (DataBalanceEngine) Objects.requireNonNull(dataBalanceEngine);
            this.inactiveDataBalanceEngine = (DataBalanceEngine) Objects.requireNonNull(dataBalanceEngine2);
        }

        DataBalanceEngine getActiveDataBalanceEngine() {
            return this.activeDataBalanceEngine;
        }

        DataBalanceEngine getInactiveDataBalanceEngine() {
            return this.inactiveDataBalanceEngine;
        }

        void shutdown() throws InterruptedException {
            this.activeDataBalanceEngine.shutdown();
            this.inactiveDataBalanceEngine.shutdown();
        }
    }

    static Set<MetricName> getMetricsAllowList() {
        DataBalancerMetricsRegistry.MetricsAllowListBuilder metricsAllowListBuilder = new DataBalancerMetricsRegistry.MetricsAllowListBuilder();
        metricsAllowListBuilder.addMetric(KafkaDataBalanceManager.class, ACTIVE_BALANCER_COUNT_METRIC_NAME);
        return metricsAllowListBuilder.buildAllowList();
    }

    public KafkaDataBalanceManager(KafkaConfig kafkaConfig) {
        this(kafkaConfig, new DataBalancerMetricsRegistry(KafkaYammerMetrics.defaultRegistry(), getMetricsAllowList()));
    }

    private KafkaDataBalanceManager(KafkaConfig kafkaConfig, DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        this(kafkaConfig, new DataBalanceEngineFactory(dataBalancerMetricsRegistry, kafkaConfig), dataBalancerMetricsRegistry, new SystemTime(), null);
    }

    KafkaDataBalanceManager(KafkaConfig kafkaConfig, DataBalanceEngineFactory dataBalanceEngineFactory, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, Time time, BalancerStatusTracker balancerStatusTracker) {
        this.kafkaConfig = (KafkaConfig) Objects.requireNonNull(kafkaConfig, "KafkaConfig must be non-null");
        this.dbeFactory = (DataBalanceEngineFactory) Objects.requireNonNull(dataBalanceEngineFactory, "DataBalanceEngineFactory must be non-null");
        this.dataBalancerMetricsRegistry = (DataBalancerMetricsRegistry) Objects.requireNonNull(dataBalancerMetricsRegistry, "MetricsRegistry must be non-null");
        this.time = time;
        this.balanceEngine = dataBalanceEngineFactory.getInactiveDataBalanceEngine();
        if (kafkaConfig.selfBalanceEnable().booleanValue()) {
            enableDatabalancerMetric();
        }
        this.balancerStatusTracker = balancerStatusTracker;
        enableBrokerIdLogging(kafkaConfig);
        this.taskHistoryRetentionPeriodMs = TimeUnit.DAYS.toMillis(kafkaConfig.getInt("confluent.balancer.task.history.retention.days").intValue());
    }

    private static void enableBrokerIdLogging(KafkaConfig kafkaConfig) {
        MDC.put("brokerId", DatabalancerUtils.getBrokerId(kafkaConfig).toString());
    }

    public synchronized void onElection(AliveBrokersSnapshot aliveBrokersSnapshot) {
        enableBrokerIdLogging(this.kafkaConfig);
        this.balanceEngine = this.dbeFactory.getActiveDataBalanceEngine();
        int intValue = DatabalancerUtils.getBrokerId(this.kafkaConfig).intValue();
        if (this.balancerStatusTracker == null) {
            this.balancerStatusTracker = new BalancerStatusTracker(intValue, registerBalancerStatusMetric(intValue), this.time);
            this.balancerStatusTracker.initialize();
        }
        if (!isBalancerEnabled(this.kafkaConfig)) {
            this.balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED);
            LOG.info("DataBalancer: Skipping DataBalancer Startup as its not enabled.");
            return;
        }
        List<String> configuredLogDirs = DatabalancerUtils.getConfiguredLogDirs(this.kafkaConfig);
        if (configuredLogDirs == null || configuredLogDirs.size() == 0) {
            throw new ConfigException("Broker configured with null or empty log directory");
        }
        if (configuredLogDirs.size() > 1) {
            this.balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.JBOD_ENABLED, new BalancerJbodEnabledMisconfigurationException("SBC configured with multiple log directories"));
        } else {
            this.balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.INITIALIZING_CRUISE_CONTROL, (Exception) null);
            LOG.info("DataBalancer: Activating SBC with {}", aliveBrokersSnapshot);
            activateEngine(Optional.of(aliveBrokersSnapshot));
        }
    }

    public boolean isActive() {
        return this.balanceEngine.isActive();
    }

    public BalancerStatusTracker balancerStatusTracker() {
        return this.balancerStatusTracker;
    }

    public synchronized void onResignation() {
        try {
            enableBrokerIdLogging(this.kafkaConfig);
            tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.BALANCER_RESIGNED).setCancellationMode(BrokerRemovalCancellationMode.TRANSIENT_CANCELLATION));
            deactivateEngine(BalancerStatusStateMachine.BalancerEvent.CONTROLLER_FAILS_OVER);
            this.balanceEngine = this.dbeFactory.getInactiveDataBalanceEngine();
            this.balancerStatusTracker = null;
        } catch (RuntimeException e) {
            LOG.error("Error occurred during DataBalanceManager resignation", e);
        }
    }

    public synchronized void shutdown() {
        try {
            this.dbeFactory.shutdown();
        } catch (InterruptedException e) {
            LOG.warn("DataBalanceManager interrupted during shutdown.");
        }
    }

    public synchronized void updateConfig(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        this.kafkaConfig = kafkaConfig2;
        if (!this.kafkaConfig.selfBalanceEnable().equals(kafkaConfig.selfBalanceEnable())) {
            if (!this.kafkaConfig.selfBalanceEnable().booleanValue()) {
                tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.BALANCER_DISABLED));
                deactivateEngine(BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED);
                disableDatabalancerMetric();
                return;
            } else {
                enableDatabalancerMetric();
                if (this.balancerStatusTracker != null) {
                    this.balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.BALANCER_ENABLED);
                }
                activateEngine(Optional.empty(), EngineInitializationContext.EngineStartupType.ON_ENABLE);
                return;
            }
        }
        Long l = this.kafkaConfig.getLong("confluent.balancer.throttle.bytes.per.second");
        if (!l.equals(kafkaConfig.getLong("confluent.balancer.throttle.bytes.per.second"))) {
            LOG.info("Setting broker throttle to {}", l);
            this.balanceEngine.updateThrottle(l);
        }
        if (!this.kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger").equals(kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger"))) {
            boolean anyUnevenLoadEnabled = DatabalancerUtils.anyUnevenLoadEnabled(this.kafkaConfig);
            LOG.info("Setting DataBalancer auto heal mode to {}", Boolean.valueOf(anyUnevenLoadEnabled));
            this.balanceEngine.setAutoHealMode(anyUnevenLoadEnabled);
        }
        List list = this.kafkaConfig.getList("confluent.balancer.exclude.topic.names");
        List list2 = this.kafkaConfig.getList("confluent.balancer.exclude.topic.prefixes");
        if (list.equals(kafkaConfig.getList("confluent.balancer.exclude.topic.names")) && list2.equals(kafkaConfig.getList("confluent.balancer.exclude.topic.prefixes"))) {
            return;
        }
        String generateCcTopicExclusionRegex = DatabalancerUtils.generateCcTopicExclusionRegex(this.kafkaConfig);
        LOG.debug("Setting excluded topics to {} and excluded prefixes to {}", list, list2);
        this.balanceEngine.updateConfigPermanently(KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG, generateCcTopicExclusionRegex);
    }

    public void onBrokersStartup(Set<Integer> set, Set<Integer> set2, AliveBrokersSnapshot aliveBrokersSnapshot) {
        if (set2.isEmpty()) {
            return;
        }
        if (!this.balanceEngine.isActive()) {
            LOG.warn("Notified of broker additions (empty broker ids {}, new brokers {}) but DataBalancer is disabled -- ignoring for now", set, set2);
            return;
        }
        LOG.info("Notify new broker arrival: {}", set2);
        this.balanceEngine.notifyBrokerChange(set2, BrokerChangeEvent.ONLINE_BROKER);
        tryCancelExistingBrokerRemovals(set2, BrokerRemovalStateMachine.BrokerRemovalEvent.BROKER_RESTARTED);
        if (set.isEmpty()) {
            return;
        }
        HashSet hashSet = new HashSet(this.balanceEngine.getDataBalanceEngineContext().brokersBeingAdded());
        hashSet.addAll(set);
        this.balanceEngine.addBrokers(hashSet, String.format("addBroker-%d", Long.valueOf(this.time.milliseconds())), aliveBrokersSnapshot);
    }

    public void onBrokersFailure(Set<Integer> set) {
        LOG.info("Notify broker failure: {}", set);
        this.balanceEngine.notifyBrokerChange(set, BrokerChangeEvent.DEAD_BROKER);
    }

    public void onAlteredExclusions(Set<Integer> set, Set<Integer> set2) {
        LOG.info("Notified of replica placement exclusion alterations - removed exclusions: ({}); added exclusions: ({})", set2, set);
        if (!set.isEmpty()) {
            overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_EXCLUSION_DETECTED, String.format("altered replica exclusion (new exclusions: [%s], removed exclusions: [%s])", set, set2));
            boolean tryCancelAllExistingBrokerRemovals = tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.EXCLUSION_ADDED).setModifiedExclusionsData(new BrokerRemovalExclusionCancellationData(ExclusionOp.OpType.SET, set)));
            if (this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().values().stream().filter(brokerRemovalStateTracker -> {
                return !brokerRemovalStateTracker.currentState().isTerminal();
            }).count() == 0 || tryCancelAllExistingBrokerRemovals) {
                this.balanceEngine.notifyBrokerChange(set, BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT);
            }
        }
        if (set2.isEmpty()) {
            return;
        }
        tryCancelAllExistingBrokerRemovals(new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.EXCLUSION_REMOVED).setModifiedExclusionsData(new BrokerRemovalExclusionCancellationData(ExclusionOp.OpType.DELETE, set2)));
        this.balanceEngine.notifyBrokerChange(set2, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
    }

    private synchronized boolean tryCancelAllExistingBrokerRemovals(BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder brokerRemovalCancellationProposalBuilder) {
        if (this.balanceEngine.isActive()) {
            return tryCancelExistingBrokerRemovals(new ArrayList(this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().values()), brokerRemovalCancellationProposalBuilder);
        }
        return false;
    }

    private synchronized boolean tryCancelExistingBrokerRemovals(Set<Integer> set, BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent) {
        return tryCancelExistingBrokerRemovals(new ArrayList(((Map) this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().entrySet().stream().filter(entry -> {
            Stream stream = ((ImmutableSet) entry.getKey()).stream();
            set.getClass();
            return stream.anyMatch((v1) -> {
                return r1.contains(v1);
            });
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }))).values()), new BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder().setCancellationEvent(brokerRemovalEvent));
    }

    private synchronized boolean tryCancelExistingBrokerRemovals(Collection<BrokerRemovalStateTracker> collection, BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder brokerRemovalCancellationProposalBuilder) {
        Map<ImmutableSet<Integer>, BrokerRemovalStateTracker> brokerRemovalsStateTrackers = this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        Set<ImmutableSet<Integer>> keySet = brokerRemovalsStateTrackers.keySet();
        List list = (List) collection.stream().map(brokerRemovalStateTracker -> {
            ImmutableSet<Integer> immutableSet = new ImmutableSet<>();
            if (tryCancelBrokerRemoval(brokerRemovalStateTracker, brokerRemovalCancellationProposalBuilder)) {
                immutableSet = brokerRemovalStateTracker.brokerIds();
                brokerRemovalsStateTrackers.remove(immutableSet);
            }
            return immutableSet;
        }).filter(immutableSet -> {
            return !immutableSet.isEmpty();
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            LOG.debug("No broker removal operations were canceled for {}, either due to none being present/in-progress or a failure in cancellation. Cancellation cause was {} ({})", new Object[]{keySet, brokerRemovalCancellationProposalBuilder.cancellationEvent(), brokerRemovalCancellationProposalBuilder.cancellationMode()});
        } else {
            LOG.info("Cancelled the broker removal operations for brokers {} due to {} ({}). (new brokers {})", new Object[]{list, brokerRemovalCancellationProposalBuilder.cancellationEvent(), brokerRemovalCancellationProposalBuilder.cancellationMode(), keySet});
        }
        return !list.isEmpty();
    }

    private boolean tryCancelBrokerRemoval(BrokerRemovalStateTracker brokerRemovalStateTracker, BrokerRemovalCancellationProposal.BrokerRemovalCancellationProposalBuilder brokerRemovalCancellationProposalBuilder) {
        ImmutableSet<Integer> brokerIds = brokerRemovalStateTracker.brokerIds();
        LOG.info("Setting cancelled state due to {} with mode {} on broker removal operation for brokers {}", new Object[]{brokerRemovalCancellationProposalBuilder.cancellationEvent(), brokerRemovalCancellationProposalBuilder.cancellationMode(), brokerIds});
        brokerRemovalCancellationProposalBuilder.setEventException(new BrokerRemovalCanceledException(String.format("The broker removal operation for brokers %s was canceled,  due to a %s event.", brokerIds, brokerRemovalCancellationProposalBuilder.cancellationEvent())));
        if (!brokerRemovalStateTracker.maybeCancel(brokerRemovalCancellationProposalBuilder.build())) {
            LOG.debug("Will not cancel broker removal operation for brokers {} because it is in state {}", brokerIds, brokerRemovalStateTracker.currentState());
            return false;
        }
        LOG.info("Successfully set canceled status on broker removal task for brokers {}. Proceeding with cancellation of the operation", brokerIds);
        boolean cancelBrokerRemoval = this.balanceEngine.cancelBrokerRemoval(brokerIds);
        if (cancelBrokerRemoval) {
            LOG.info("Successfully canceled the broker removal operation for brokers {}.", brokerIds);
        } else {
            LOG.error("Did not succeed in canceling the broker removal operation for brokers {}", brokerIds);
        }
        return cancelBrokerRemoval;
    }

    public List<BrokerRemovalDescriptionInternal> brokerRemovals() {
        if (!this.balanceEngine.isActive()) {
            LOG.error("Received request to describe broker removals while Databalancer is not started.");
            throw new BalancerOfflineException("Received request to describe broker removals while Databalancer is not started.");
        }
        long milliseconds = this.time.milliseconds();
        long j = milliseconds - this.taskHistoryRetentionPeriodMs;
        ApiStatePersistenceStore persistenceStore = this.balanceEngine.getDataBalanceEngineContext().getPersistenceStore();
        if (persistenceStore == null) {
            return Collections.emptyList();
        }
        LOG.debug("Returning broker removal records from {} with task retention milliseconds: {}", Long.valueOf(milliseconds), Long.valueOf(this.taskHistoryRetentionPeriodMs));
        List<BrokerRemovalDescriptionInternal> list = (List) persistenceStore.getAllBrokerRemovalStateRecords().values().stream().peek(brokerRemovalStateRecord -> {
            LOG.debug(brokerRemovalStateRecord.toString());
        }).filter(brokerRemovalStateRecord2 -> {
            return brokerRemovalStateRecord2.lastUpdateTime() > j;
        }).sorted(Comparator.comparingLong((v0) -> {
            return v0.lastUpdateTime();
        })).flatMap(brokerRemovalStateRecord3 -> {
            return brokerRemovalStateRecord3.toRemovalDescriptions().stream();
        }).collect(Collectors.toList());
        HashSet hashSet = new HashSet();
        for (BrokerRemovalDescriptionInternal brokerRemovalDescriptionInternal : list) {
            hashSet.remove(brokerRemovalDescriptionInternal);
            hashSet.add(brokerRemovalDescriptionInternal);
        }
        return new ArrayList(hashSet);
    }

    public List<BrokerAdditionDescriptionInternal> brokerAdditions() {
        if (this.balanceEngine.isActive()) {
            long milliseconds = this.time.milliseconds() - this.taskHistoryRetentionPeriodMs;
            return (List) this.balanceEngine.getDataBalanceEngineContext().getBrokerAdditionsStateManagers().values().stream().filter(brokerAdditionStateManager -> {
                return brokerAdditionStateManager.lastUpdateTimeMs() > milliseconds;
            }).map(brokerAdditionStateManager2 -> {
                return new BrokerAdditionDescriptionInternal(brokerAdditionStateManager2.brokerId(), brokerAdditionStateManager2.currentState().status(), BrokerAdditionStateMachine.convertBrokerAdditionStatus(brokerAdditionStateManager2.currentState().status()), brokerAdditionStateManager2.creationTimeMs(), brokerAdditionStateManager2.lastUpdateTimeMs(), brokerAdditionStateManager2.exception().orElse(null));
            }).collect(Collectors.toList());
        }
        LOG.error("Received request to describe broker additions while Databalancer is not started.");
        throw new BalancerOfflineException("Received request to describe broker additions while Databalancer is not started.");
    }

    public BalancerStatusDescriptionInternal balancerStatus() {
        return new BalancerStatusDescriptionInternal(this.balancerStatusTracker.currentState().status(), this.balancerStatusTracker.exception().orElse(null));
    }

    public EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatus() {
        return this.balanceEngine.evenClusterLoadStatus(this.kafkaConfig);
    }

    private static boolean isBalancerEnabled(KafkaConfig kafkaConfig) {
        return kafkaConfig.selfBalanceEnable().booleanValue();
    }

    public synchronized void scheduleBrokerRemoval(BrokerRemovalRequest brokerRemovalRequest, AliveBrokersSnapshot aliveBrokersSnapshot) {
        if (!this.balanceEngine.isActive()) {
            String format = String.format("Received request to remove brokers %s while DataBalancer is not started.", brokerRemovalRequest);
            LOG.error(format);
            throw new BalancerOfflineException(format);
        }
        if (validateAndCheckNoOp(brokerRemovalRequest, aliveBrokersSnapshot)) {
            return;
        }
        overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_REMOVAL_REQUEST_OVERRIDES, String.format("broker removal request (for brokers %s)", brokerRemovalRequest.brokersEligibleForRemoval));
        Stream stream = brokerRemovalRequest.brokersEligibleForRemoval.stream();
        Function function = num -> {
            return num;
        };
        aliveBrokersSnapshot.getClass();
        Map<Integer, Optional<Long>> map = (Map) stream.collect(Collectors.toMap(function, (v1) -> {
            return r2.epochFor(v1);
        }));
        String uuid = BrokerRemovalCallback.uuid(map.keySet(), this.time.milliseconds());
        LOG.info("Submitting broker removal operation with UUID {} for brokers with epochs {}", uuid, map);
        this.balanceEngine.removeBrokers(map, brokerRemovalRequest.shouldShutdown.booleanValue(), uuid);
    }

    private boolean validateAndCheckNoOp(BrokerRemovalRequest brokerRemovalRequest, AliveBrokersSnapshot aliveBrokersSnapshot) {
        long milliseconds = this.time.milliseconds() - this.taskHistoryRetentionPeriodMs;
        if (!brokerRemovalRequest.nonExistentBrokers.isEmpty()) {
            Set set = (Set) this.balanceEngine.getDataBalanceEngineContext().getPersistenceStore().getAllBrokerRemovalStateRecords().entrySet().stream().filter(entry -> {
                return ((BrokerRemovalStateRecord) entry.getValue()).lastUpdateTime() > milliseconds;
            }).flatMap(entry2 -> {
                return ((ImmutableSet) entry2.getKey()).stream();
            }).collect(Collectors.toSet());
            if (!set.containsAll(brokerRemovalRequest.nonExistentBrokers)) {
                throw new InvalidBrokerRemovalException(String.format("Unknown broker ids specified %s", (List) brokerRemovalRequest.nonExistentBrokers.stream().filter(num -> {
                    return !set.contains(num);
                }).collect(Collectors.toList())));
            }
            if (brokerRemovalRequest.brokersEligibleForRemoval.isEmpty()) {
                LOG.info("The broker removal request {} is a no-op because all of its requested brokers were removed.", brokerRemovalRequest);
                return true;
            }
            LOG.info("Performing a no-op for a subset of broker ids ({}) in the broker removal request {} because the brokers were already removed.", brokerRemovalRequest.nonExistentBrokers, brokerRemovalRequest);
        }
        Set set2 = (Set) aliveBrokersSnapshot.replicaExclusions().stream().filter(num2 -> {
            return !brokerRemovalRequest.nonExistentBrokers.contains(num2);
        }).filter(num3 -> {
            return !brokerRemovalRequest.brokersEligibleForRemoval.contains(num3);
        }).collect(Collectors.toSet());
        if (set2.isEmpty()) {
            return false;
        }
        LOG.info("Rejecting broker removal request for brokers {} because there were replica exclusions present in the cluster for other brokers ({}) are not to be removed as part of this operation.", brokerRemovalRequest.brokersEligibleForRemoval, set2);
        throw new BalancerBrokerExcludedForReplicaPlacementException(String.format("Cannot remove brokers while there are active replica exclusions on brokers that are not be removed from the cluster. Consider removing the exclusions from brokers %s and retrying the removal operation", set2));
    }

    private void overrideAllPendingBrokerAdditions(BrokerAdditionStateMachine.BrokerAdditionEvent brokerAdditionEvent, String str) {
        if (!brokerAdditionEvent.canCancel()) {
            throw new IllegalArgumentException(String.format("Cannot override broker additions with event %s (cause %s)", brokerAdditionEvent, str));
        }
        ArrayList arrayList = new ArrayList();
        int i = -1;
        try {
            for (BrokerAdditionStateManager brokerAdditionStateManager : this.balanceEngine.getDataBalanceEngineContext().getBrokerAdditionsStateManagers().values()) {
                i = brokerAdditionStateManager.brokerId();
                if (!brokerAdditionStateManager.isAtATerminalState()) {
                    brokerAdditionStateManager.registerEvent(brokerAdditionEvent, (Exception) new BalancerOperationOverriddenException(String.format("The broker addition operation for broker %d was cancelled due to a %s which overrode it", Integer.valueOf(i), str)));
                    arrayList.add(Integer.valueOf(i));
                }
            }
        } catch (Exception e) {
            LOG.error("Received exception when trying to cancel the addition for broker {}", Integer.valueOf(i), e);
        }
        if (arrayList.isEmpty()) {
            return;
        }
        LOG.info("Cancelled broker additions for brokers {} due to a {}", arrayList, str);
    }

    private AtomicReference<String> registerBrokerAdditionMetric(int i) {
        AtomicReference<String> atomicReference = new AtomicReference<>("NOT_STARTED");
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.dataBalancerMetricsRegistry;
        atomicReference.getClass();
        dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, BROKER_ADDITION_STATE_METRIC_NAME, atomicReference::get, true, brokerIdMetricTag(i));
        return atomicReference;
    }

    private AtomicReference<String> registerBalancerStatusMetric(int i) {
        AtomicReference<String> atomicReference = new AtomicReference<>("NOT_CONTROLLER");
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.dataBalancerMetricsRegistry;
        atomicReference.getClass();
        dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, BALANCER_STATE_METRIC_NAME, atomicReference::get, true, brokerIdMetricTag(i));
        return atomicReference;
    }

    Map<String, String> brokerIdMetricTag(int i) {
        HashMap hashMap = new HashMap();
        hashMap.put("broker", String.valueOf(i));
        return hashMap;
    }

    private void activateEngine(Optional<AliveBrokersSnapshot> optional) {
        activateEngine(optional, EngineInitializationContext.EngineStartupType.ON_FAILOVER);
    }

    private void activateEngine(Optional<AliveBrokersSnapshot> optional, EngineInitializationContext.EngineStartupType engineStartupType) {
        this.balanceEngine.onActivation(new EngineInitializationContext(this.kafkaConfig, engineStartupType, optional, new BrokerRemovalMetricRegistry(), (v1) -> {
            return registerBrokerAdditionMetric(v1);
        }, this.balancerStatusTracker));
    }

    private void deactivateEngine(BalancerStatusStateMachine.BalancerEvent balancerEvent) {
        this.balanceEngine.onDeactivation(balancerEvent);
    }

    public DataBalanceEngine getBalanceEngine() {
        return this.balanceEngine;
    }

    void setBalanceEngine(DataBalanceEngine dataBalanceEngine) {
        this.balanceEngine = dataBalanceEngine;
    }

    private void enableDatabalancerMetric() {
        LOG.info("Registering metric {}", ACTIVE_BALANCER_COUNT_METRIC_NAME);
        this.dataBalancerMetricsRegistry.newGauge(KafkaDataBalanceManager.class, ACTIVE_BALANCER_COUNT_METRIC_NAME, () -> {
            return Integer.valueOf(this.balanceEngine.isActive() ? 1 : 0);
        }, false);
    }

    private void disableDatabalancerMetric() {
        LOG.info("De-registering metric {}", ACTIVE_BALANCER_COUNT_METRIC_NAME);
        this.dataBalancerMetricsRegistry.clearLongLivedMetric(KafkaDataBalanceManager.class, ACTIVE_BALANCER_COUNT_METRIC_NAME);
    }

    public KafkaConfig getKafkaConfig() {
        return this.kafkaConfig;
    }
}
