package io.confluent.databalancer;

import com.linkedin.cruisecontrol.exception.CruiseControlException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.common.KafkaCruiseControlThreadFactory;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalCallback;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalFuture;
import com.linkedin.kafka.cruisecontrol.operation.MultiBrokerAdditionOperation;
import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BalancerStatusStateMachine;
import io.confluent.databalancer.operation.BalancerStatusTracker;
import io.confluent.databalancer.operation.BrokerAdditionStateMachine;
import io.confluent.databalancer.operation.BrokerAdditionStateManager;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.operation.MultiBrokerBalancerOperationTerminationListener;
import io.confluent.databalancer.operation.PersistRemoveApiStateListener;
import io.confluent.databalancer.operation.SingleBrokerBalancerOperationProgressListener;
import io.confluent.databalancer.operation.SingleBrokerBalancerOperationTerminationListener;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import io.confluent.databalancer.startup.CruiseControlStartable;
import io.confluent.databalancer.startup.StartupCheckInterruptedException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.common.AliveBrokersSnapshot;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import kafka.server.KafkaConfig;
import org.apache.kafka.common.errors.BalancerBrokerExcludedForReplicaPlacementException;
import org.apache.kafka.common.errors.BalancerJbodEnabledMisconfigurationException;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalInProgressException;
import org.apache.kafka.common.protocol.BalancerOperationOverriddenException;
import org.apache.kafka.common.utils.SystemTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngine.class */
public class ConfluentDataBalanceEngine implements DataBalanceEngine {
    public static final String BROKER_ADD_COUNT_METRIC_NAME = "BrokerAddCount";
    private static final Logger LOG = LoggerFactory.getLogger(ConfluentDataBalanceEngine.class);
    private static final int SHUTDOWN_TIMEOUT_MS = 15000;
    private final ExecutorService ccRunner;
    private KafkaDataBalanceManager.BrokerRemovalMetricRegistry brokerRemovalMetricRegistry;
    private Function<Integer, AtomicReference<String>> brokerAdditionStateMetricRegistrationHandler;
    final MultiBrokerBalancerOperationTerminationListener<BrokerRemovalStateMachine.BrokerRemovalState> removalTerminationListener;
    final SingleBrokerBalancerOperationTerminationListener<BrokerAdditionStateMachine.BrokerAdditionState> additionTerminationListener;
    final SingleBrokerBalancerOperationProgressListener<BrokerAdditionStateMachine.BrokerAdditionState> additionProgressListener;
    final ConfluentDataBalanceEngineContext context;
    private Semaphore abortStartupCheck;
    volatile boolean canAcceptRequests;

    public ConfluentDataBalanceEngine(DataBalancerMetricsRegistry dataBalancerMetricsRegistry, KafkaConfig kafkaConfig) {
        this(Executors.newSingleThreadExecutor(createThreadFactory(kafkaConfig)), createContext(dataBalancerMetricsRegistry));
    }

    private static ConfluentDataBalanceEngineContext createContext(DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        return new ConfluentDataBalanceEngineContext(dataBalancerMetricsRegistry, null, new SystemTime());
    }

    private static KafkaCruiseControlThreadFactory createThreadFactory(KafkaConfig kafkaConfig) {
        return new KafkaCruiseControlThreadFactory("DataBalanceEngine", true, LOG, Optional.of(DatabalancerUtils.getBrokerId(kafkaConfig)));
    }

    ConfluentDataBalanceEngine(ExecutorService executorService, ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext) {
        this.abortStartupCheck = new Semaphore(0);
        this.canAcceptRequests = false;
        this.ccRunner = (ExecutorService) Objects.requireNonNull(executorService, "ExecutorService must be non-null");
        this.context = confluentDataBalanceEngineContext;
        this.removalTerminationListener = (set, brokerRemovalState, exc) -> {
            confluentDataBalanceEngineContext.brokerRemovalsStateTrackers.remove(new ImmutableSet(set));
            LOG.info("Removal for brokers {} reached terminal state {}", set, brokerRemovalState);
        };
        this.additionTerminationListener = (i, brokerAdditionState, exc2) -> {
            LOG.info("Addition operation for broker {} reached terminal state {}", new Object[]{Integer.valueOf(i), brokerAdditionState, exc2});
        };
        this.additionProgressListener = (i2, brokerAdditionState2, exc3) -> {
            LOG.info("Addition status for broker {} changed to {}", new Object[]{Integer.valueOf(i2), brokerAdditionState2, exc3});
        };
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public DataBalanceEngineContext getDataBalanceEngineContext() {
        return this.context;
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public synchronized void onActivation(EngineInitializationContext engineInitializationContext) {
        this.brokerRemovalMetricRegistry = engineInitializationContext.brokerRemovalMetricRegistry;
        this.brokerAdditionStateMetricRegistrationHandler = engineInitializationContext.brokerAdditionStateMetricRegistrationHandler;
        this.context.setBalancerStatusTracker(engineInitializationContext.balancerStatusTracker);
        LOG.info("DataBalancer: Scheduling DataBalanceEngine Startup");
        registerMetrics();
        this.abortStartupCheck.drainPermits();
        this.canAcceptRequests = true;
        this.ccRunner.submit(() -> {
            doStart(engineInitializationContext);
        });
    }

    private void registerMetrics() {
        this.context.getDataBalancerMetricsRegistry().newGauge(ConfluentDataBalanceEngine.class, BROKER_ADD_COUNT_METRIC_NAME, () -> {
            return Integer.valueOf(this.context.brokersBeingAdded().size());
        }, true);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public synchronized void onDeactivation(BalancerStatusStateMachine.BalancerEvent balancerEvent) {
        LOG.info("DataBalancer: Scheduling DataBalanceEngine Shutdown.");
        this.abortStartupCheck.release();
        this.canAcceptRequests = false;
        submitToCcRunnerOrElse(() -> {
            stopCruiseControl(balancerEvent);
        }, () -> {
            if (this.context.getBalancerStatusTracker() != null) {
                this.context.getBalancerStatusTracker().registerEvent(balancerEvent);
            }
        }, LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void shutdown() throws InterruptedException {
        this.ccRunner.shutdown();
        this.ccRunner.awaitTermination(15000L, TimeUnit.MILLISECONDS);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void updateThrottle(Long l) {
        LOG.info("DataBalancer: Scheduling DataBalanceEngine throttle update to {}", l);
        submitToCcRunner(() -> {
            updateThrottleHelper(l);
        }, "Cannot update throttle when no DataBalancer is active.", LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void setAutoHealMode(boolean z) {
        LOG.info("DataBalancer: Scheduling DataBalanceEngine auto-heal update (setting to {})", Boolean.valueOf(z));
        submitToCcRunner(() -> {
            updateAutoHealHelper(z);
        }, "Attempt to update auto-heal mode (" + z + ") when no DataBalancer is active.", LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void updateConfigPermanently(String str, Object obj) {
        LOG.debug("Permanently updating config {} to {}", str, obj.toString());
        submitToCcRunner(() -> {
            updateConfigPermanentlyHelper(str, obj);
        }, "", LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void removeBrokers(Map<Integer, Optional<Long>> map, boolean z, String str) {
        HashSet hashSet = new HashSet(map.keySet());
        String logPrefix = BrokerRemovalCallback.logPrefix(str);
        if (!this.canAcceptRequests) {
            String format = String.format("Received request to remove brokers %s (uid %s) while DataBalancer is not started.", map, str);
            LOG.error(format);
            throw new BalancerOfflineException(format);
        }
        LOG.info("DataBalancer: {} Scheduling DataBalanceEngine broker removal: {} (uid: {}, shouldShutdown: {})", new Object[]{logPrefix, map, str, Boolean.valueOf(z)});
        ApiStatePersistenceStore persistenceStore = this.context.getPersistenceStore();
        Supplier supplier = () -> {
            return persistenceStore.getAllBrokerRemovalStateRecords().values().stream().filter(brokerRemovalStateRecord -> {
                return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
            });
        };
        Set set = (Set) ((Stream) supplier.get()).flatMap(brokerRemovalStateRecord -> {
            return brokerRemovalStateRecord.brokerIds().stream();
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            submitRemoveBroker(map, z, new BrokerRemovalStateTracker(hashSet, new PersistRemoveApiStateListener(this.context.getPersistenceStore(), z), this.removalTerminationListener, this.brokerRemovalMetricRegistry.registerBrokerRemovalMetric(hashSet), this.context.getTime()), str);
        } else if (set.containsAll(hashSet)) {
            LOG.info("DataBalancer: {} Performing a no-op and returning a successful response for the broker removal operation {} because the requested brokers to be removed ({}) are already being removed with an on-going in-progress removal operation (for brokers {}).", new Object[]{logPrefix, str, hashSet, set});
        } else {
            String str2 = "Cannot remove brokers " + hashSet + " as broker removals already in progress: " + ((String) ((Stream) supplier.get()).map(brokerRemovalStateRecord2 -> {
                return String.format("[%s]", brokerRemovalStateRecord2.brokerIds().toString());
            }).collect(Collectors.joining(",")));
            LOG.error(str2);
            throw new BrokerRemovalInProgressException(str2);
        }
    }

    private void submitRemoveBroker(Map<Integer, Optional<Long>> map, boolean z, BrokerRemovalStateTracker brokerRemovalStateTracker, String str) {
        this.context.brokerRemovalsStateTrackers.put(brokerRemovalStateTracker.brokerIds(), brokerRemovalStateTracker);
        brokerRemovalStateTracker.initialize();
        submitToCcRunner(() -> {
            doRemoveBroker(map, z, brokerRemovalStateTracker, str);
        }, "Broker removal operation with UID " + str + " was not initiated due to the data balance engine not being initialized", LOG);
    }

    private void doRemoveBroker(Map<Integer, Optional<Long>> map, boolean z, BrokerRemovalStateTracker brokerRemovalStateTracker, String str) {
        LOG.info("Initiating broker removal operation with UID {} for brokers with epochs {}", str, map);
        ImmutableSet<Integer> brokerIds = brokerRemovalStateTracker.brokerIds();
        try {
            this.context.putBrokerRemovalFuture(brokerIds, this.context.getCruiseControl().removeBrokers(map, z, (z2, th) -> {
                this.context.removeBrokerRemovalFuture(brokerIds);
            }, brokerRemovalStateTracker, str));
        } catch (Throwable th2) {
            LOG.error("Broker removal operation with UID {} for brokers {} failed due to ", new Object[]{str, brokerIds, th2});
        }
    }

    Future<?> submitToCcRunner(Runnable runnable, String str, Logger logger) {
        return submitToCcRunnerOrElse(runnable, () -> {
            LOG.info(str);
        }, logger);
    }

    Future<?> submitToCcRunnerOrElse(Runnable runnable, Runnable runnable2, Logger logger) {
        return this.ccRunner.submit(() -> {
            if (!isActive()) {
                runnable2.run();
                return;
            }
            try {
                runnable.run();
            } catch (Throwable th) {
                logger.error("Uncaught exception in " + Thread.currentThread().getName() + ": ", th);
                throw th;
            }
        });
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void addBrokers(Set<Integer> set, String str, AliveBrokersSnapshot aliveBrokersSnapshot) {
        if (!this.canAcceptRequests) {
            String format = String.format("Received request to add brokers %s while DataBalancer is not started.", set);
            LOG.error(format);
            throw new BalancerOfflineException(format);
        }
        BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback = (z, th) -> {
            if (z) {
                LOG.info("The broker addition operation for brokers {} has completed successfully", set);
            } else if (th != null) {
                LOG.info("The broker addition operation for brokers {} has completed erroneously", set);
            } else {
                LOG.info("The broker addition operation for brokers {} completed unsuccessfully without any errors", set);
            }
        };
        MultiBrokerAdditionOperation initializeBrokerAdditionState = initializeBrokerAdditionState(set);
        Set replicaExclusions = aliveBrokersSnapshot.replicaExclusions();
        if (!replicaExclusions.isEmpty()) {
            String str2 = (String) set.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(","));
            LOG.warn("Active broker replica exclusions detected (brokers {}), will not add new brokers {}", replicaExclusions, str2);
            initializeBrokerAdditionState.registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_EXCLUSION_DETECTED, (Exception) new BalancerBrokerExcludedForReplicaPlacementException(String.format("The broker addition operation for brokers %s was cancelled because there were active broker replica exclusions in the cluster (brokers %s were excluded)", str2, replicaExclusions)));
        } else if (this.context.getPersistenceStore().getAllBrokerRemovalStateRecords().values().stream().anyMatch(brokerRemovalStateRecord -> {
            return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
        })) {
            LOG.warn("Broker removals ongoing, will not add new brokers {}", set);
            initializeBrokerAdditionState.registerEvent(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_REMOVAL_REQUEST_OVERRIDES, (Exception) new BalancerOperationOverriddenException(String.format("The broker addition operation for brokers %s was cancelled because a higher-priority broker removal request was ongoing", set.stream().map((v0) -> {
                return v0.toString();
            }).collect(Collectors.joining(",")))));
        } else {
            LOG.info("DataBalancer: Scheduling DataBalanceEngine broker addition: {}", set);
            submitToCcRunner(() -> {
                doAddBrokers(initializeBrokerAdditionState, balanceOpExecutionCompletionCallback, str);
            }, "Broker addition operation with UID " + str + " was not initiated due to the data balance engine not being initialized", LOG);
        }
    }

    private MultiBrokerAdditionOperation initializeBrokerAdditionState(Set<Integer> set) {
        HashMap hashMap = new HashMap();
        for (Integer num : set) {
            BrokerAdditionStateManager brokerAdditionStateManager = new BrokerAdditionStateManager(num.intValue(), this.additionProgressListener, this.additionTerminationListener, this.brokerAdditionStateMetricRegistrationHandler.apply(num), new SystemTime());
            brokerAdditionStateManager.initialize();
            hashMap.put(num, brokerAdditionStateManager);
            this.context.putBrokerStateManager(brokerAdditionStateManager);
        }
        return new MultiBrokerAdditionOperation(hashMap.keySet(), new ArrayList(hashMap.values()));
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public boolean cancelBrokerRemoval(Set<Integer> set) {
        BrokerRemovalFuture brokerRemovalFuture = this.context.brokerRemovalFuture(new ImmutableSet<>(set));
        if (brokerRemovalFuture == null) {
            if (!LOG.isDebugEnabled()) {
                return false;
            }
            LOG.debug("Will not cancel broker removal for brokers {} as they are not being removed.", set);
            return false;
        }
        LOG.info("Canceling broker removal task for brokers {}", set);
        boolean cancel = brokerRemovalFuture.cancel();
        LOG.info("Canceled broker removal task for brokers {} (future canceled {})", set, Boolean.valueOf(cancel));
        return cancel;
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public void notifyBrokerChange(Set<Integer> set, BrokerChangeEvent brokerChangeEvent) {
        String format = String.format("Ignoring request to track %s event for brokers (%s) while the data balance engine is not initialized.", brokerChangeEvent, set);
        LOG.info("Notify {} event for brokers: {}", brokerChangeEvent, set);
        submitToCcRunner(() -> {
            this.context.getCruiseControl().notifyBrokerChange(set, brokerChangeEvent);
        }, format, LOG);
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public boolean isActive() {
        return this.context.isCruiseControlInitialized();
    }

    @Override // io.confluent.databalancer.DataBalanceEngine
    public EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatus(KafkaConfig kafkaConfig) {
        if (this.canAcceptRequests) {
            return isActive() ? this.context.getEvenClusterLoadStateManager().evenClusterLoadStatusDescription() : DatabalancerUtils.anyUnevenLoadEnabled(kafkaConfig) ? EvenClusterLoadStatusDescriptionInternal.STARTING : EvenClusterLoadStatusDescriptionInternal.DISABLED;
        }
        LOG.error("Received request to describe the cluster even load status while the Confluent Balancer component is disabled or not started yet. Query the BalancerStatus Admin API for more details.");
        throw new BalancerOfflineException("Received request to describe the cluster even load status while the Confluent Balancer component is disabled or not started yet. Query the BalancerStatus Admin API for more details.");
    }

    void updateThrottleHelper(Long l) {
        LOG.info("Updating balancer throttle to {}", l);
        this.context.getCruiseControl().updateThrottle(l.longValue());
    }

    void updateAutoHealHelper(boolean z) {
        LOG.info("Changing GOAL_VIOLATION anomaly self-healing actions to {}", Boolean.valueOf(z));
        this.context.getCruiseControl().setGoalViolationSelfHealing(z);
    }

    public void updateConfigPermanentlyHelper(String str, Object obj) {
        LOG.debug("Permanently updating config {} to {}", str, obj.toString());
        this.context.getCruiseControl().updateConfigPermanently(str, obj);
    }

    void doStart(EngineInitializationContext engineInitializationContext) {
        doStart(engineInitializationContext, new CruiseControlStartable(this.context.getTime(), this.context.getDataBalancerMetricsRegistry()));
    }

    void doStart(EngineInitializationContext engineInitializationContext, CruiseControlStartable cruiseControlStartable) {
        if (isActive()) {
            LOG.warn("DataBalanceEngine already running when startUp requested.");
            return;
        }
        LOG.info("DataBalancer: Instantiating DataBalanceEngine");
        KafkaCruiseControl kafkaCruiseControl = null;
        BalancerStatusTracker balancerStatusTracker = this.context.getBalancerStatusTracker();
        try {
            kafkaCruiseControl = cruiseControlStartable.createKafkaCruiseControl(engineInitializationContext.kafkaConfig, engineInitializationContext.howStarted, this.abortStartupCheck);
            this.context.setPersistenceStore(new ApiStatePersistenceStore(engineInitializationContext.kafkaConfig, this.context.getTime(), CruiseControlStartable.generateClientConfigs(engineInitializationContext.kafkaConfig)));
            kafkaCruiseControl.startUp(this.context.getPersistenceStore());
            resubmitPendingOperations(engineInitializationContext);
            balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.CRUISE_CONTROL_INITIALIZATION_COMPLETED);
            this.context.setCruiseControl(kafkaCruiseControl);
            LOG.info("DataBalancer: DataBalanceEngine started");
        } catch (BalancerJbodEnabledMisconfigurationException e) {
            balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.JBOD_ENABLED, (Exception) e);
            if (kafkaCruiseControl != null && this.context.getCruiseControl() == null) {
                kafkaCruiseControl.shutdown();
            }
            this.context.closeAndClearState();
        } catch (StartupCheckInterruptedException e2) {
            LOG.warn("DataBalanceEngine startup aborted by shutdown.", e2);
            if (kafkaCruiseControl != null && this.context.getCruiseControl() == null) {
                kafkaCruiseControl.shutdown();
            }
            this.context.closeAndClearState();
        } catch (Throwable th) {
            balancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.CRUISE_CONTROL_ERRORED, (Exception) new CruiseControlException(th));
            LOG.warn("Unable to start up DataBalanceEngine", th);
            if (kafkaCruiseControl != null && this.context.getCruiseControl() == null) {
                kafkaCruiseControl.shutdown();
            }
            this.context.closeAndClearState();
        }
    }

    private void resubmitPendingOperations(EngineInitializationContext engineInitializationContext) {
        Set<BrokerRemovalStateRecord> set = (Set) this.context.getPersistenceStore().getAllBrokerRemovalStateRecords().values().stream().filter(brokerRemovalStateRecord -> {
            return !BrokerRemovalStateMachine.isStateTerminal(brokerRemovalStateRecord.state());
        }).collect(Collectors.toSet());
        if (set.isEmpty()) {
            LOG.info("No pending DataBalancer operations found at startup.");
            return;
        }
        for (BrokerRemovalStateRecord brokerRemovalStateRecord2 : set) {
            PersistRemoveApiStateListener persistRemoveApiStateListener = new PersistRemoveApiStateListener(this.context.getPersistenceStore(), brokerRemovalStateRecord2.shouldShutdown());
            ImmutableSet<Integer> brokerIds = brokerRemovalStateRecord2.brokerIds();
            BrokerRemovalStateMachine.BrokerRemovalState state = brokerRemovalStateRecord2.state();
            BrokerRemovalStateTracker brokerRemovalStateTracker = new BrokerRemovalStateTracker(brokerIds, state, persistRemoveApiStateListener, this.removalTerminationListener, this.brokerRemovalMetricRegistry.registerBrokerRemovalMetric(brokerIds), this.context.getTime());
            String uuid = BrokerRemovalCallback.uuid(brokerIds, this.context.getTime().milliseconds());
            AliveBrokersSnapshot orElse = engineInitializationContext.aliveBrokersSnapshotOpt.orElse(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
            Stream<Integer> stream = brokerIds.stream();
            Function function = num -> {
                return num;
            };
            orElse.getClass();
            submitRemoveBroker((Map) stream.collect(Collectors.toMap(function, (v1) -> {
                return r3.epochFor(v1);
            })), brokerRemovalStateRecord2.shouldShutdown(), brokerRemovalStateTracker, uuid);
            LOG.info("Submitted pending operation {} to remove broker ids {} with state {}.", new Object[]{uuid, brokerIds, state});
        }
    }

    void stopCruiseControl(BalancerStatusStateMachine.BalancerEvent balancerEvent) {
        LOG.info("DataBalancer: Starting DataBalanceEngine Shutdown");
        try {
            try {
                this.context.getCruiseControl().userTriggeredStopExecution();
                this.context.closeAndClearState();
                if (this.context.getBalancerStatusTracker() != null) {
                    this.context.getBalancerStatusTracker().registerEvent(balancerEvent);
                }
            } catch (Exception e) {
                LOG.warn("Unable to stop DataBalanceEngine", e);
                if (this.context.getBalancerStatusTracker() != null) {
                    this.context.getBalancerStatusTracker().registerEvent(balancerEvent);
                }
            }
            LOG.info("DataBalancer: DataBalanceEngine shutdown completed.");
        } catch (Throwable th) {
            if (this.context.getBalancerStatusTracker() != null) {
                this.context.getBalancerStatusTracker().registerEvent(balancerEvent);
            }
            throw th;
        }
    }

    void doAddBrokers(MultiBrokerAdditionOperation multiBrokerAdditionOperation, BalanceOpExecutionCompletionCallback balanceOpExecutionCompletionCallback, String str) {
        if (multiBrokerAdditionOperation.brokerIds().isEmpty()) {
            LOG.info("Will not be proceeding with the add broker operation as no new brokers were supplied.");
            return;
        }
        LOG.info("DataBalancer: Starting addBrokers call");
        try {
            this.context.getCruiseControl().addBrokers(multiBrokerAdditionOperation, balanceOpExecutionCompletionCallback, str);
        } catch (Exception e) {
            LOG.warn("Broker addition of {} failed", multiBrokerAdditionOperation.brokerIds(), e);
        }
    }
}
