package io.confluent.databalancer;

import com.yammer.metrics.core.MetricName;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BrokerRemovalCancellationMode;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.common.BrokerRemovalDescriptionInternal;
import kafka.controller.DataBalanceManager;
import kafka.metrics.KafkaYammerMetrics;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalCanceledException;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import scala.Option;

/* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager.class */
public class KafkaDataBalanceManager implements DataBalanceManager {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaDataBalanceManager.class);
    public static final String ACTIVE_BALANCER_COUNT_METRIC_NAME = "ActiveBalancerCount";
    public static final String BROKER_REMOVAL_STATE_METRIC_NAME = "BrokerRemovalOperationState";
    public static final String BROKER_ADD_COUNT_METRIC_NAME = "BrokerAddCount";
    Set<Integer> brokersToAdd;
    private KafkaConfig kafkaConfig;
    DataBalanceEngine balanceEngine;
    private final DataBalanceEngineFactory dbeFactory;
    private final DataBalancerMetricsRegistry dataBalancerMetricsRegistry;
    private Time time;

    /* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManager$DataBalanceEngineFactory.class */
    static class DataBalanceEngineFactory {
        private final DataBalanceEngine activeDataBalanceEngine;
        private final DataBalanceEngine inactiveDataBalanceEngine;

        DataBalanceEngineFactory(DataBalancerMetricsRegistry dataBalancerMetricsRegistry, KafkaConfig kafkaConfig) {
            this(new ConfluentDataBalanceEngine(dataBalancerMetricsRegistry, kafkaConfig), new NoOpDataBalanceEngine());
        }

        DataBalanceEngineFactory(DataBalanceEngine dataBalanceEngine, DataBalanceEngine dataBalanceEngine2) {
            this.activeDataBalanceEngine = (DataBalanceEngine) Objects.requireNonNull(dataBalanceEngine);
            this.inactiveDataBalanceEngine = (DataBalanceEngine) Objects.requireNonNull(dataBalanceEngine2);
        }

        DataBalanceEngine getActiveDataBalanceEngine() {
            return this.activeDataBalanceEngine;
        }

        DataBalanceEngine getInactiveDataBalanceEngine() {
            return this.inactiveDataBalanceEngine;
        }

        void shutdown() throws InterruptedException {
            this.activeDataBalanceEngine.shutdown();
            this.inactiveDataBalanceEngine.shutdown();
        }
    }

    static Set<MetricName> getMetricsWhiteList() {
        DataBalancerMetricsRegistry.MetricsWhitelistBuilder metricsWhitelistBuilder = new DataBalancerMetricsRegistry.MetricsWhitelistBuilder();
        metricsWhitelistBuilder.addMetric(KafkaDataBalanceManager.class, ACTIVE_BALANCER_COUNT_METRIC_NAME);
        return metricsWhitelistBuilder.buildWhitelist();
    }

    public KafkaDataBalanceManager(KafkaConfig kafkaConfig) {
        this(kafkaConfig, new DataBalancerMetricsRegistry(KafkaYammerMetrics.defaultRegistry(), getMetricsWhiteList()));
    }

    private KafkaDataBalanceManager(KafkaConfig kafkaConfig, DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        this(kafkaConfig, new DataBalanceEngineFactory(dataBalancerMetricsRegistry, kafkaConfig), dataBalancerMetricsRegistry, new SystemTime());
    }

    KafkaDataBalanceManager(KafkaConfig kafkaConfig, DataBalanceEngineFactory dataBalanceEngineFactory, DataBalancerMetricsRegistry dataBalancerMetricsRegistry, Time time) {
        this.kafkaConfig = (KafkaConfig) Objects.requireNonNull(kafkaConfig, "KafkaConfig must be non-null");
        this.dbeFactory = (DataBalanceEngineFactory) Objects.requireNonNull(dataBalanceEngineFactory, "DataBalanceEngineFactory must be non-null");
        this.dataBalancerMetricsRegistry = (DataBalancerMetricsRegistry) Objects.requireNonNull(dataBalancerMetricsRegistry, "MetricsRegistry must be non-null");
        this.time = time;
        this.balanceEngine = dataBalanceEngineFactory.getInactiveDataBalanceEngine();
        this.dataBalancerMetricsRegistry.newGauge(KafkaDataBalanceManager.class, ACTIVE_BALANCER_COUNT_METRIC_NAME, () -> {
            return Integer.valueOf(this.balanceEngine.isActive() ? 1 : 0);
        }, false);
        this.brokersToAdd = ConcurrentHashMap.newKeySet();
        enableBrokerIdLogging(kafkaConfig);
    }

    public static Integer getBrokerId(KafkaConfig kafkaConfig) {
        return kafkaConfig.getInt(KafkaConfig$.MODULE$.BrokerIdProp());
    }

    private static void enableBrokerIdLogging(KafkaConfig kafkaConfig) {
        MDC.put("brokerId", getBrokerId(kafkaConfig).toString());
    }

    public synchronized void onElection(Map<Integer, Long> map) {
        enableBrokerIdLogging(this.kafkaConfig);
        this.balanceEngine = this.dbeFactory.getActiveDataBalanceEngine();
        if (this.kafkaConfig.getBoolean("confluent.balancer.enable").booleanValue()) {
            activateEngine(map);
        } else {
            LOG.info("DataBalancer: Skipping DataBalancer Startup as its not enabled.");
        }
    }

    public boolean isActive() {
        return this.balanceEngine.isActive();
    }

    public synchronized void onResignation() {
        enableBrokerIdLogging(this.kafkaConfig);
        cancelAllExistingBrokerRemovals(BrokerRemovalCancellationMode.TRANSIENT_CANCELLATION);
        deactivateEngine();
        this.balanceEngine = this.dbeFactory.getInactiveDataBalanceEngine();
    }

    public synchronized void shutdown() {
        try {
            this.dbeFactory.shutdown();
        } catch (InterruptedException e) {
            LOG.warn("DataBalanceManager interrupted during shutdown.");
        }
    }

    public synchronized void updateConfig(KafkaConfig kafkaConfig, KafkaConfig kafkaConfig2) {
        this.kafkaConfig = kafkaConfig2;
        if (!this.kafkaConfig.getBoolean("confluent.balancer.enable").equals(kafkaConfig.getBoolean("confluent.balancer.enable"))) {
            if (this.kafkaConfig.getBoolean("confluent.balancer.enable").booleanValue()) {
                activateEngine(Collections.emptyMap());
                return;
            } else {
                cancelAllExistingBrokerRemovals(BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION);
                deactivateEngine();
                return;
            }
        }
        Long l = this.kafkaConfig.getLong("confluent.balancer.throttle.bytes.per.second");
        if (!l.equals(kafkaConfig.getLong("confluent.balancer.throttle.bytes.per.second"))) {
            LOG.info("Setting broker throttle to {}", l);
            this.balanceEngine.updateThrottle(l);
        }
        if (this.kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger").equals(kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger"))) {
            return;
        }
        boolean equals = this.kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger").equals(ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        LOG.info("Setting DataBalancer auto heal mode to {}", Boolean.valueOf(equals));
        this.balanceEngine.setAutoHealMode(equals);
    }

    public void onBrokersStartup(Set<Integer> set, Set<Integer> set2) {
        if (set2.isEmpty()) {
            return;
        }
        if (!this.balanceEngine.isActive()) {
            LOG.warn("Notified of broker additions (empty broker ids {}, new brokers {}) but DataBalancer is disabled -- ignoring for now", set, set2);
            return;
        }
        cancelExistingBrokerRemovals(set2);
        if (set.isEmpty()) {
            return;
        }
        this.brokersToAdd.addAll(set);
        HashSet hashSet = new HashSet(this.brokersToAdd);
        this.balanceEngine.addBrokers(hashSet, (z, th) -> {
            LOG.info("Add Operation completed with success value {}", Boolean.valueOf(z));
            if (z || th != null) {
                this.brokersToAdd.removeAll(hashSet);
                LOG.info("Broker Add op (of brokers {}) completion, remaining brokers to add: {}", hashSet, this.brokersToAdd);
            }
        }, String.format("addBroker-%d", Long.valueOf(this.time.milliseconds())));
    }

    private synchronized void cancelAllExistingBrokerRemovals(BrokerRemovalCancellationMode brokerRemovalCancellationMode) {
        if (this.balanceEngine.isActive()) {
            cancelExistingBrokerRemovals(this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers().values(), brokerRemovalCancellationMode);
        }
    }

    private synchronized void cancelExistingBrokerRemovals(Set<Integer> set) {
        Map<Integer, BrokerRemovalStateTracker> brokerRemovalsStateTrackers = this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        Stream<Integer> stream = set.stream();
        brokerRemovalsStateTrackers.getClass();
        cancelExistingBrokerRemovals((Set) stream.map((v1) -> {
            return r1.get(v1);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toSet()), BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION);
    }

    private synchronized void cancelExistingBrokerRemovals(Collection<BrokerRemovalStateTracker> collection, BrokerRemovalCancellationMode brokerRemovalCancellationMode) {
        Map<Integer, BrokerRemovalStateTracker> brokerRemovalsStateTrackers = this.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        Set<Integer> keySet = brokerRemovalsStateTrackers.keySet();
        List list = (List) collection.stream().map(brokerRemovalStateTracker -> {
            Integer num = null;
            if (tryCancelBrokerRemoval(brokerRemovalStateTracker, brokerRemovalCancellationMode)) {
                num = Integer.valueOf(brokerRemovalStateTracker.brokerId());
            }
            brokerRemovalsStateTrackers.remove(Integer.valueOf(brokerRemovalStateTracker.brokerId()));
            return num;
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            LOG.debug("No broker removal operations were canceled for {}, either due to none being present or a failure in cancellation", keySet);
        } else {
            LOG.info("Cancelled the broker removal operations for brokers {}. (new brokers {})", list, keySet);
        }
    }

    private boolean tryCancelBrokerRemoval(BrokerRemovalStateTracker brokerRemovalStateTracker, BrokerRemovalCancellationMode brokerRemovalCancellationMode) {
        int brokerId = brokerRemovalStateTracker.brokerId();
        LOG.info("Setting cancelled state on broker removal operation {}", Integer.valueOf(brokerId));
        if (!brokerRemovalStateTracker.cancel(new BrokerRemovalCanceledException(String.format("The broker removal operation for broker %d was canceled, likely due to the broker starting back up while it was being removed or as part of shutdown.", Integer.valueOf(brokerId))), brokerRemovalCancellationMode)) {
            LOG.info("Will not cancel broker removal operation for broker {} because it is in state {}", Integer.valueOf(brokerId), brokerRemovalStateTracker.currentState());
            return false;
        }
        LOG.info("Successfully set canceled status on broker removal task for broker {}. Proceeding with cancellation of the operation", Integer.valueOf(brokerId));
        boolean cancelBrokerRemoval = this.balanceEngine.cancelBrokerRemoval(brokerId);
        if (cancelBrokerRemoval) {
            LOG.info("Successfully canceled the broker removal operation for broker {}.", Integer.valueOf(brokerId));
        } else {
            LOG.error("Did not succeed in canceling the broker removal operation for broker {}", Integer.valueOf(brokerId));
        }
        return cancelBrokerRemoval;
    }

    public List<BrokerRemovalDescriptionInternal> brokerRemovals() {
        if (this.balanceEngine.isActive()) {
            ApiStatePersistenceStore persistenceStore = this.balanceEngine.getDataBalanceEngineContext().getPersistenceStore();
            return persistenceStore == null ? Collections.emptyList() : (List) persistenceStore.getAllBrokerRemovalStateRecords().values().stream().map((v0) -> {
                return v0.toRemovalDescription();
            }).collect(Collectors.toList());
        }
        LOG.error("Received request to describe broker removals while Databalancer is not started.");
        throw new BalancerOfflineException("Received request to describe broker removals while Databalancer is not started.");
    }

    public synchronized void scheduleBrokerRemoval(int i, Option<Long> option) {
        if (!this.balanceEngine.isActive()) {
            String format = String.format("Received request to remove broker %d while DataBalancer is not started.", Integer.valueOf(i));
            LOG.error(format);
            throw new BalancerOfflineException(format);
        }
        Optional<Long> empty = option.isEmpty() ? Optional.empty() : Optional.of(option.get());
        String format2 = String.format("remove-broker-%d-%d", Integer.valueOf(i), Long.valueOf(this.time.milliseconds()));
        LOG.info("Submitting broker removal operation with UUID {} for broker {} (epoch {})", new Object[]{format2, Integer.valueOf(i), option});
        this.balanceEngine.removeBroker(i, empty, format2);
    }

    private AtomicReference<String> registerBrokerRemovalMetric(int i) {
        AtomicReference<String> atomicReference = new AtomicReference<>("NOT_STARTED");
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.dataBalancerMetricsRegistry;
        atomicReference.getClass();
        dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, BROKER_REMOVAL_STATE_METRIC_NAME, atomicReference::get, true, brokerIdMetricTag(i));
        return atomicReference;
    }

    Map<String, String> brokerIdMetricTag(int i) {
        HashMap hashMap = new HashMap();
        hashMap.put("broker", String.valueOf(i));
        return hashMap;
    }

    private void activateEngine(Map<Integer, Long> map) {
        this.balanceEngine.onActivation(new EngineInitializationContext(this.kafkaConfig, map, (v1) -> {
            return registerBrokerRemovalMetric(v1);
        }));
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = this.dataBalancerMetricsRegistry;
        Set<Integer> set = this.brokersToAdd;
        set.getClass();
        dataBalancerMetricsRegistry.newGauge(ConfluentDataBalanceEngine.class, BROKER_ADD_COUNT_METRIC_NAME, set::size, true);
    }

    private void deactivateEngine() {
        this.balanceEngine.onDeactivation();
    }
}
