package io.confluent.databalancer;

import com.linkedin.cruisecontrol.exception.CruiseControlException;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalCallback;
import com.linkedin.kafka.cruisecontrol.operation.BrokerRemovalFuture;
import com.linkedin.kafka.cruisecontrol.operation.MultiBrokerAdditionOperation;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.confluent.databalancer.EngineInitializationContext;
import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BalancerOperationState;
import io.confluent.databalancer.operation.BalancerStatusStateMachine;
import io.confluent.databalancer.operation.BalancerStatusTracker;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.operation.SelfHealingEvenClusterLoadStateManager;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import io.confluent.databalancer.startup.CruiseControlStartable;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import kafka.common.AliveBrokersSnapshot;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import kafka.metrics.KafkaYammerMetrics;
import kafka.server.KafkaConfig;
import kafka.utils.MockTime;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.kafka.clients.admin.EvenClusterLoadStatus;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalInProgressException;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.internal.verification.VerificationModeFactory;
import org.slf4j.Logger;

/* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngineTest.class */
public class ConfluentDataBalanceEngineTest {
    private Time mockTime = new MockTime();

    @Mock
    private KafkaCruiseControl mockCruiseControl;

    @Mock
    private DataBalancerMetricsRegistry mockMetricsRegistry;

    @Mock
    private ApiStatePersistenceStore persistenceStore;

    @Mock
    private BalancerStatusTracker balancerStatusTracker;

    @Mock
    Logger logger;
    private Map<ImmutableSet<Integer>, BrokerRemovalStateRecord> brokerRemovalStateRecordMap;
    private ExecutorService executorService;

    /* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngineTest$RejectedExecutorService.class */
    public static class RejectedExecutorService extends ThreadPoolExecutor {
        public RejectedExecutorService(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectedExecutionHandler rejectedExecutionHandler) {
            super(i, i2, j, timeUnit, blockingQueue, rejectedExecutionHandler);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
        }
    }

    private static ExecutorService currentThreadExecutorService() {
        return new RejectedExecutorService(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @BeforeEach
    public void setUp() {
        MockitoAnnotations.openMocks(this);
        ((KafkaCruiseControl) Mockito.doNothing().when(this.mockCruiseControl)).userTriggeredStopExecution();
    }

    private ConfluentDataBalanceEngine getTestDataBalanceEngine() throws InterruptedException {
        return getTestDataBalanceEngine(this.mockMetricsRegistry);
    }

    private ConfluentDataBalanceEngine getTestDataBalanceEngine(DataBalancerMetricsRegistry dataBalancerMetricsRegistry) throws InterruptedException {
        ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(dataBalancerMetricsRegistry, this.mockCruiseControl, this.mockTime));
        Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
        Mockito.when(confluentDataBalanceEngineContext.getBalancerStatusTracker()).thenReturn(this.balancerStatusTracker);
        return getTestDataBalanceEngine(confluentDataBalanceEngineContext);
    }

    private ConfluentDataBalanceEngine getTestDataBalanceEngine(ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext) throws InterruptedException {
        this.executorService = (ExecutorService) Mockito.spy(currentThreadExecutorService());
        this.brokerRemovalStateRecordMap = new ConcurrentHashMap();
        ((ApiStatePersistenceStore) Mockito.doAnswer(invocationOnMock -> {
            BrokerRemovalStateRecord brokerRemovalStateRecord = (BrokerRemovalStateRecord) invocationOnMock.getArguments()[0];
            this.brokerRemovalStateRecordMap.put(new ImmutableSet<>(brokerRemovalStateRecord.brokerIds()), brokerRemovalStateRecord);
            return null;
        }).when(this.persistenceStore)).save((BrokerRemovalStateRecord) ArgumentMatchers.any(BrokerRemovalStateRecord.class), ArgumentMatchers.anyBoolean());
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(this.brokerRemovalStateRecordMap);
        return new ConfluentDataBalanceEngine(this.executorService, confluentDataBalanceEngineContext);
    }

    @Test
    public void testStopCruiseControlNotInitialized() {
        BalancerStatusStateMachine.BalancerEvent balancerEvent = BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED;
        ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(this.mockMetricsRegistry, (KafkaCruiseControl) null, this.mockTime));
        Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
        Mockito.when(confluentDataBalanceEngineContext.getBalancerStatusTracker()).thenReturn(this.balancerStatusTracker);
        new ConfluentDataBalanceEngine(currentThreadExecutorService(), confluentDataBalanceEngineContext).onDeactivation(balancerEvent);
        ((BalancerStatusTracker) Mockito.verify(this.balancerStatusTracker)).registerEvent(balancerEvent);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).shutdown();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry, Mockito.never())).clearShortLivedMetrics();
    }

    @Test
    public void testCruiseControlStartUpFailure() throws Exception {
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) Mockito.mock(KafkaCruiseControl.class);
        CruiseControlStartable cruiseControlStartable = (CruiseControlStartable) Mockito.mock(CruiseControlStartable.class);
        BalancerStatusTracker balancerStatusTracker = (BalancerStatusTracker) Mockito.mock(BalancerStatusTracker.class);
        EngineInitializationContext engineInitializationContext = new EngineInitializationContext((KafkaConfig) Mockito.mock(KafkaConfig.class), EngineInitializationContext.EngineStartupType.ON_FAILOVER, (Optional) null, (KafkaDataBalanceManager.BrokerRemovalMetricRegistry) null, (Function) null, (BalancerStatusTracker) null);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = testDataBalanceEngine.getDataBalanceEngineContext();
        dataBalanceEngineContext.setCruiseControl((KafkaCruiseControl) null);
        Mockito.when(dataBalanceEngineContext.getBalancerStatusTracker()).thenReturn(balancerStatusTracker);
        Mockito.when(dataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
        ((CruiseControlStartable) Mockito.doThrow(new Throwable[]{new RuntimeException("Test exception")}).when(cruiseControlStartable)).createKafkaCruiseControl((KafkaConfig) ArgumentMatchers.any(KafkaConfig.class), (EngineInitializationContext.EngineStartupType) ArgumentMatchers.eq(EngineInitializationContext.EngineStartupType.ON_ENABLE), (Semaphore) ArgumentMatchers.any(Semaphore.class));
        testDataBalanceEngine.doStart(engineInitializationContext, cruiseControlStartable);
        ((BalancerStatusTracker) Mockito.verify(balancerStatusTracker)).registerEvent((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.eq(BalancerStatusStateMachine.BalancerEvent.CRUISE_CONTROL_ERRORED), (Exception) ArgumentMatchers.any(CruiseControlException.class));
        BalancerStatusStateMachine.BalancerEvent balancerEvent = BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED;
        testDataBalanceEngine.onDeactivation(balancerEvent);
        ((BalancerStatusTracker) Mockito.verify(balancerStatusTracker)).registerEvent(balancerEvent);
        ((KafkaCruiseControl) Mockito.verify(kafkaCruiseControl, Mockito.never())).shutdown();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry, VerificationModeFactory.times(1))).clearShortLivedMetrics();
    }

    @Test
    public void testStopCruiseControlAfterShutdown() throws InterruptedException {
        BalancerStatusStateMachine.BalancerEvent balancerEvent = BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED;
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onDeactivation(balancerEvent);
        ((BalancerStatusTracker) Mockito.verify(this.balancerStatusTracker)).registerEvent(balancerEvent);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, VerificationModeFactory.times(1))).shutdown();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry, VerificationModeFactory.times(1))).clearShortLivedMetrics();
        testDataBalanceEngine.onDeactivation(balancerEvent);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, VerificationModeFactory.times(1))).shutdown();
        ((BalancerStatusTracker) Mockito.verify(this.balancerStatusTracker, VerificationModeFactory.times(2))).registerEvent(balancerEvent);
    }

    @Test
    public void testStopCruiseControl() throws InterruptedException {
        BalancerStatusStateMachine.BalancerEvent balancerEvent = BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED;
        getTestDataBalanceEngine().stopCruiseControl(balancerEvent);
        ((BalancerStatusTracker) Mockito.verify(this.balancerStatusTracker)).registerEvent(balancerEvent);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).shutdown();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).userTriggeredStopExecution();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry)).clearShortLivedMetrics();
    }

    @Test
    public void testDeactivation() throws InterruptedException {
        BalancerStatusStateMachine.BalancerEvent balancerEvent = BalancerStatusStateMachine.BalancerEvent.CONTROLLER_FAILS_OVER;
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.canAcceptRequests = true;
        testDataBalanceEngine.onDeactivation(balancerEvent);
        ((BalancerStatusTracker) Mockito.verify(this.balancerStatusTracker)).registerEvent(balancerEvent);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).shutdown();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).userTriggeredStopExecution();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry)).clearShortLivedMetrics();
        Assertions.assertFalse(testDataBalanceEngine.canAcceptRequests, "DatabalanceEngine is not stopped");
    }

    @Test
    public void testStartCruiseControlNoOp() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.doStart((EngineInitializationContext) null);
        Assertions.assertSame(testDataBalanceEngine.context.getCruiseControl(), this.mockCruiseControl);
    }

    @Test
    public void testStartCruiseControlPersistenceThrowsError() throws Exception {
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) Mockito.mock(KafkaCruiseControl.class);
        CruiseControlStartable cruiseControlStartable = (CruiseControlStartable) Mockito.mock(CruiseControlStartable.class);
        BalancerStatusTracker balancerStatusTracker = (BalancerStatusTracker) Mockito.mock(BalancerStatusTracker.class);
        Mockito.when(cruiseControlStartable.createKafkaCruiseControl((KafkaConfig) ArgumentMatchers.any(KafkaConfig.class), (EngineInitializationContext.EngineStartupType) ArgumentMatchers.eq(EngineInitializationContext.EngineStartupType.ON_FAILOVER), (Semaphore) ArgumentMatchers.any(Semaphore.class))).thenReturn(kafkaCruiseControl);
        EngineInitializationContext engineInitializationContext = new EngineInitializationContext((KafkaConfig) Mockito.mock(KafkaConfig.class), EngineInitializationContext.EngineStartupType.ON_FAILOVER, (Optional) null, (KafkaDataBalanceManager.BrokerRemovalMetricRegistry) null, (Function) null, (BalancerStatusTracker) null);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = testDataBalanceEngine.getDataBalanceEngineContext();
        dataBalanceEngineContext.setCruiseControl((KafkaCruiseControl) null);
        Mockito.when(dataBalanceEngineContext.getBalancerStatusTracker()).thenReturn(balancerStatusTracker);
        ((ConfluentDataBalanceEngineContext) Mockito.doThrow(new Throwable[]{new RuntimeException("Test exception")}).when(dataBalanceEngineContext)).setPersistenceStore((ApiStatePersistenceStore) ArgumentMatchers.any(ApiStatePersistenceStore.class));
        testDataBalanceEngine.doStart(engineInitializationContext, cruiseControlStartable);
        ((KafkaCruiseControl) Mockito.verify(kafkaCruiseControl)).shutdown();
    }

    private EngineInitializationContext initializationContext(KafkaConfig kafkaConfig) {
        KafkaDataBalanceManager.BrokerRemovalMetricRegistry brokerRemovalMetricRegistry = (KafkaDataBalanceManager.BrokerRemovalMetricRegistry) Mockito.mock(KafkaDataBalanceManager.BrokerRemovalMetricRegistry.class);
        Mockito.when(brokerRemovalMetricRegistry.registerBrokerRemovalMetric((Set) ArgumentMatchers.any())).thenReturn(new AtomicReference("Test removal metric"));
        return new EngineInitializationContext(kafkaConfig, EngineInitializationContext.EngineStartupType.ON_FAILOVER, Optional.empty(), brokerRemovalMetricRegistry, num -> {
            return new AtomicReference("Test addition metric");
        }, (BalancerStatusTracker) null);
    }

    @Test
    public void testOnActivation() throws InterruptedException {
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ((ExecutorService) Mockito.doReturn((Object) null).when(this.executorService)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        ((ExecutorService) Mockito.verify(this.executorService)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        Assertions.assertTrue(testDataBalanceEngine.canAcceptRequests, "DatabalanceEngine is not started");
    }

    @Test
    public void testRemoveBroker() throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put(1, Optional.of(1L));
        hashMap.put(2, Optional.of(1L));
        ImmutableSet immutableSet = new ImmutableSet(hashMap.keySet());
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(this.mockCruiseControl.removeBrokers((Map) ArgumentMatchers.eq(hashMap), ArgumentMatchers.eq(false), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
            brokerRemovalFuture.execute(Duration.ofSeconds(10L));
            return brokerRemovalFuture;
        });
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BalanceOpExecutionCompletionCallback.class);
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashMap.keySet(), BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, false);
        testDataBalanceEngine.removeBrokers(hashMap, false, "uid");
        Assertions.assertTrue(testDataBalanceEngine.canAcceptRequests, "DatabalanceEngine is not started");
        ((ExecutorService) Mockito.verify(this.executorService, VerificationModeFactory.times(2))).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).removeBrokers((Map) ArgumentMatchers.eq(hashMap), ArgumentMatchers.eq(false), (BalanceOpExecutionCompletionCallback) forClass.capture(), (BrokerRemovalCallback) Mockito.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString());
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).putBrokerRemovalFuture(immutableSet, brokerRemovalFuture);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).execute(Duration.ofSeconds(10L));
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context, Mockito.never())).removeBrokerRemovalFuture(immutableSet);
        ((BalanceOpExecutionCompletionCallback) forClass.getValue()).accept(true, (Throwable) null);
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).removeBrokerRemovalFuture(immutableSet);
        Assertions.assertEquals(1, this.brokerRemovalStateRecordMap.size(), "Expected to have a persisted broker removal state");
        Assertions.assertEquals(brokerRemovalStateRecord.brokerIds(), this.brokerRemovalStateRecordMap.get(immutableSet).brokerIds());
        Assertions.assertEquals(Boolean.valueOf(brokerRemovalStateRecord.shouldShutdown()), Boolean.valueOf(this.brokerRemovalStateRecordMap.get(immutableSet).shouldShutdown()));
        Assertions.assertEquals(1, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size(), "Expected to have one broker removal state tracker in the context object");
    }

    @Test
    public void testBrokerRemovalTerminationListener() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        List asList = Arrays.asList(1, 2);
        testDataBalanceEngine.context.brokerRemovalsStateTrackers.put(new ImmutableSet(asList), Mockito.mock(BrokerRemovalStateTracker.class));
        testDataBalanceEngine.removalTerminationListener.onTerminalState(new HashSet(asList), (BalancerOperationState) null, (Exception) null);
        Assertions.assertEquals(0, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size(), "Expected to have removed the broker removal state tracker in the context object");
    }

    @Test
    public void testRemoveBroker_InitiatesRemovalOnPreviouslyRemovedBroker() throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put(1, Optional.of(1L));
        hashMap.put(2, Optional.of(1L));
        ImmutableSet immutableSet = new ImmutableSet(hashMap.keySet());
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Assertions.assertTrue(testDataBalanceEngine.canAcceptRequests, "DatabalanceEngine is not started");
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(this.mockCruiseControl.removeBrokers((Map) ArgumentMatchers.eq(hashMap), ArgumentMatchers.eq(true), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenAnswer(invocationOnMock -> {
            brokerRemovalFuture.execute(Duration.ofSeconds(10L));
            return brokerRemovalFuture;
        });
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(immutableSet)).thenReturn(new BrokerRemovalStateRecord(hashMap.keySet(), BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null, 1 == 0));
        testDataBalanceEngine.removeBrokers(hashMap, true, "uid");
        ((ExecutorService) Mockito.verify(this.executorService, VerificationModeFactory.times(2))).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).removeBrokers((Map) ArgumentMatchers.eq(hashMap), ArgumentMatchers.eq(true), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString());
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).putBrokerRemovalFuture(immutableSet, brokerRemovalFuture);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).execute(Duration.ofSeconds(10L));
        Assertions.assertTrue(testDataBalanceEngine.canAcceptRequests, "DatabalanceEngine is not started");
    }

    @Test
    public void testRemoveBrokerTwiceSucceeds() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put(1, Optional.of(1L));
        hashMap.put(2, Optional.of(1L));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashMap.keySet(), BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, true);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(brokerRemovalStateRecord.brokerIds(), brokerRemovalStateRecord));
        testDataBalanceEngine.removeBrokers(hashMap, true, "uid");
        Assertions.assertEquals(0, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size(), "Expected to not have a broker removal state tracker in the context object due to the way this test was mocked");
    }

    @Test
    public void testRemoveBrokerFailsIfExistingRemovalsAreNotASubset() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put(1, Optional.of(1L));
        hashMap.put(2, Optional.of(1L));
        ImmutableSet immutableSet = new ImmutableSet(hashMap.keySet());
        HashSet hashSet = new HashSet(Arrays.asList(2, 3, 4));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, true);
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(immutableSet)).thenReturn((Object) null);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(brokerRemovalStateRecord.brokerIds(), brokerRemovalStateRecord));
        Assertions.assertThrows(BrokerRemovalInProgressException.class, () -> {
            testDataBalanceEngine.removeBrokers(hashMap, true, "uid");
        });
    }

    @Test
    public void testRemoveBrokerIdempotencyWhenRemovalInProgress() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put(1, Optional.of(1L));
        hashMap.put(2, Optional.of(1L));
        hashMap.put(3, Optional.of(1L));
        hashMap.put(4, Optional.of(1L));
        Map map = (Map) hashMap.entrySet().stream().limit(2L).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(hashMap.keySet(), BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, true);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(brokerRemovalStateRecord.brokerIds(), brokerRemovalStateRecord));
        Mockito.when(this.mockCruiseControl.removeBrokers((Map) ArgumentMatchers.eq(map), ArgumentMatchers.eq(true), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn((BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class));
        testDataBalanceEngine.removeBrokers(map, true, "uid");
        Assertions.assertEquals(0, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size(), "Expected to not have a broker removal state tracker in the context object due to the no-op and the way this test was mocked");
    }

    @Test
    public void testRemoveBrokerThrowsBalancerOfflineExceptionIfNoActiveDatabalancer() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        HashMap hashMap = new HashMap();
        hashMap.put(1, Optional.of(1L));
        hashMap.put(2, Optional.of(1L));
        Assertions.assertThrows(BalancerOfflineException.class, () -> {
            testDataBalanceEngine.removeBrokers(hashMap, true, "uid");
        });
    }

    @Test
    public void testCancelBrokerRemovals_CallsFutureCancel() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(Boolean.valueOf(brokerRemovalFuture.cancel())).thenReturn(true);
        BrokerRemovalFuture brokerRemovalFuture2 = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(Boolean.valueOf(brokerRemovalFuture2.cancel())).thenReturn(false);
        ImmutableSet immutableSet = new ImmutableSet(Arrays.asList(1, 2));
        ImmutableSet immutableSet2 = new ImmutableSet(Arrays.asList(2, 3));
        ImmutableSet immutableSet3 = new ImmutableSet(Collections.singletonList(3));
        testDataBalanceEngine.context.putBrokerRemovalFuture(immutableSet, brokerRemovalFuture);
        testDataBalanceEngine.context.putBrokerRemovalFuture(immutableSet2, brokerRemovalFuture2);
        boolean cancelBrokerRemoval = testDataBalanceEngine.cancelBrokerRemoval(immutableSet);
        boolean cancelBrokerRemoval2 = testDataBalanceEngine.cancelBrokerRemoval(immutableSet2);
        boolean cancelBrokerRemoval3 = testDataBalanceEngine.cancelBrokerRemoval(immutableSet3);
        Assertions.assertTrue(cancelBrokerRemoval, "Expected first set of brokers to be cancelled successfully");
        Assertions.assertFalse(cancelBrokerRemoval2, "Expected second set of brokers to not be cancelled successfully");
        Assertions.assertFalse(cancelBrokerRemoval3, "Expected third set of brokers to not be cancelled successfully because it doesn't exist");
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).cancel();
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture2)).cancel();
    }

    @Test
    public void testUpdateThrottleWhileRunning() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.updateThrottle(100L);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).updateThrottle(100L);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateThrottleHelper(100L);
    }

    @Test
    public void testUpdateThrottleWhileStopped() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        confluentDataBalanceEngine.updateThrottle(100L);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).updateThrottle(ArgumentMatchers.anyLong());
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateThrottle(100L);
    }

    @Test
    public void testUpdateAutoHeal() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.setAutoHealMode(true);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).setGoalViolationSelfHealing(true);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateAutoHealHelper(true);
        confluentDataBalanceEngine.setAutoHealMode(false);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).setGoalViolationSelfHealing(false);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateAutoHealHelper(false);
    }

    @Test
    public void testUpdateAutoHealWhenStopped() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        confluentDataBalanceEngine.setAutoHealMode(true);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).setGoalViolationSelfHealing(true);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).setAutoHealMode(true);
    }

    @Test
    public void testAddBroker_success() throws Exception {
        MultiBrokerAdditionOperation brokersToAdd = brokersToAdd(10);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        confluentDataBalanceEngine.addBrokers(brokersToAdd.brokerIds(), "testUid", AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        verifyDoAddBrokers(confluentDataBalanceEngine, brokersToAdd.brokerIds(), "testUid");
    }

    public void verifyDoAddBrokers(ConfluentDataBalanceEngine confluentDataBalanceEngine, Set<Integer> set, String str) throws Exception {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(MultiBrokerAdditionOperation.class);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).doAddBrokers((MultiBrokerAdditionOperation) forClass.capture(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (String) ArgumentMatchers.eq(str));
        Assertions.assertEquals(set.size(), ((MultiBrokerAdditionOperation) forClass.getValue()).brokerIds().size());
        Assertions.assertTrue(set.containsAll(((MultiBrokerAdditionOperation) forClass.getValue()).brokerIds()), String.format("Expected %s to contain all of %s", set, ((MultiBrokerAdditionOperation) forClass.getValue()).brokerIds()));
        ArgumentCaptor forClass2 = ArgumentCaptor.forClass(MultiBrokerAdditionOperation.class);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).addBrokers((MultiBrokerAdditionOperation) forClass2.capture(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (String) ArgumentMatchers.eq(str));
        Assertions.assertEquals(set.size(), ((MultiBrokerAdditionOperation) forClass2.getValue()).brokerIds().size());
        Assertions.assertTrue(set.containsAll(((MultiBrokerAdditionOperation) forClass2.getValue()).brokerIds()), String.format("Expected %s to contain all of %s", set, ((MultiBrokerAdditionOperation) forClass2.getValue()).brokerIds()));
        Assertions.assertEquals(set, confluentDataBalanceEngine.context.brokerAdditionsStateManagers.keySet());
        Assertions.assertEquals(set, confluentDataBalanceEngine.context.brokersBeingAdded());
    }

    @Test
    public void testAddBroker_activeReplicaExclusion() throws InterruptedException {
        Set singleton = Collections.singleton(10);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(new HashMap());
        confluentDataBalanceEngine.addBrokers(singleton, "testUid", new AliveBrokersSnapshot(Collections.emptyMap(), Collections.singleton(1)));
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine, Mockito.never())).doAddBrokers((MultiBrokerAdditionOperation) ArgumentMatchers.any(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
    }

    @Test
    public void testAddBroker_pendingRemove() throws InterruptedException {
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        Set singleton = Collections.singleton(10);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(new ImmutableSet(hashSet), new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, true)));
        confluentDataBalanceEngine.addBrokers(singleton, "testUid", AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine, Mockito.never())).doAddBrokers((MultiBrokerAdditionOperation) ArgumentMatchers.any(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
    }

    @Test
    public void testAddBroker_successfulRemove() throws Exception {
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        MultiBrokerAdditionOperation brokersToAdd = brokersToAdd(10);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(new ImmutableSet(hashSet), new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null, true)));
        confluentDataBalanceEngine.addBrokers(brokersToAdd.brokerIds(), "testUid", AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        verifyDoAddBrokers(confluentDataBalanceEngine, brokersToAdd.brokerIds(), "testUid");
    }

    private MultiBrokerAdditionOperation brokersToAdd(Integer... numArr) {
        MultiBrokerAdditionOperation multiBrokerAdditionOperation = (MultiBrokerAdditionOperation) Mockito.mock(MultiBrokerAdditionOperation.class);
        Mockito.when(multiBrokerAdditionOperation.brokerIds()).thenReturn(Arrays.stream(numArr).collect(Collectors.toSet()));
        return multiBrokerAdditionOperation;
    }

    @Test
    public void testAddBroker_failedRemove() throws Exception {
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        MultiBrokerAdditionOperation brokersToAdd = brokersToAdd(10);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(new ImmutableSet(hashSet), new BrokerRemovalStateRecord(hashSet, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_CANCELED, (Exception) null, true)));
        confluentDataBalanceEngine.addBrokers(brokersToAdd.brokerIds(), "testUid", AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        verifyDoAddBrokers(confluentDataBalanceEngine, brokersToAdd.brokerIds(), "testUid");
    }

    @Test
    public void testRemoveBrokerStateTrackerContinuouslyUpdatesStatus() throws Throwable {
        HashMap hashMap = new HashMap();
        hashMap.put(1, Optional.of(1L));
        hashMap.put(2, Optional.of(1L));
        ImmutableSet immutableSet = new ImmutableSet(hashMap.keySet());
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.mockCruiseControl.removeBrokers((Map) ArgumentMatchers.eq(hashMap), ArgumentMatchers.eq(true), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn((BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(immutableSet, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, true);
        testDataBalanceEngine.removeBrokers(hashMap, true, "uid");
        Assertions.assertEquals(1, this.brokerRemovalStateRecordMap.size(), "Expected to have a persisted broker removal state");
        Assertions.assertEquals(brokerRemovalStateRecord, this.brokerRemovalStateRecordMap.get(new ImmutableSet(immutableSet)));
        Assertions.assertEquals(1, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size(), "Expected to have one broker removal state tracker in the context object");
        BrokerRemovalStateTracker brokerRemovalStateTracker = (BrokerRemovalStateTracker) testDataBalanceEngine.context.brokerRemovalsStateTrackers.get(new ImmutableSet(immutableSet));
        brokerRemovalStateTracker.registerEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.INITIAL_PLAN_COMPUTATION_SUCCESS);
        brokerRemovalStateTracker.registerEvent(BrokerRemovalStateMachine.BrokerRemovalEvent.EXCLUSION_SUCCESS);
        Assertions.assertEquals(new BrokerRemovalStateRecord(immutableSet, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_INITIATED, (Exception) null, true), this.brokerRemovalStateRecordMap.get(new ImmutableSet(immutableSet)));
    }

    @Test
    public void testSubmitToCcRunner() throws Exception {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        testDataBalanceEngine.submitToCcRunner(() -> {
            mutableBoolean.setValue(true);
        }, "", this.logger);
        Assertions.assertTrue(mutableBoolean.booleanValue(), "Task was not invoked");
    }

    @Test
    public void testSubmitToCcRunnerCCNotInitialized() throws Exception {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.context.setCruiseControl((KafkaCruiseControl) null);
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        testDataBalanceEngine.submitToCcRunner(() -> {
            mutableBoolean.setValue(true);
        }, "", this.logger);
        Assertions.assertFalse(mutableBoolean.booleanValue(), "Task was invoked even though CC is not initialized.");
    }

    @Test
    public void testSubmitToCcRunnerCatchesException() throws Exception {
        getTestDataBalanceEngine().submitToCcRunner(() -> {
            throw new Error("Test error");
        }, "", this.logger);
        ((Logger) Mockito.verify(this.logger)).error(ArgumentMatchers.anyString(), (Throwable) ArgumentMatchers.any(Throwable.class));
    }

    @Test
    public void testSubmitToCcRunnerOrElse() throws Exception {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        MutableBoolean mutableBoolean2 = new MutableBoolean(false);
        testDataBalanceEngine.submitToCcRunnerOrElse(() -> {
            mutableBoolean.setValue(true);
        }, () -> {
            mutableBoolean2.setValue(true);
        }, this.logger);
        Assertions.assertTrue(mutableBoolean.booleanValue(), "Expected task invocation failed!");
        Assertions.assertFalse(mutableBoolean2.booleanValue(), "Unexpected task invocation occured!");
    }

    @Test
    public void testSubmitToCcRunnerOrElseCCNotInitialized() throws Exception {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.context.setCruiseControl((KafkaCruiseControl) null);
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        MutableBoolean mutableBoolean2 = new MutableBoolean(false);
        testDataBalanceEngine.submitToCcRunnerOrElse(() -> {
            mutableBoolean.setValue(true);
        }, () -> {
            mutableBoolean2.setValue(true);
        }, this.logger);
        Assertions.assertFalse(mutableBoolean.booleanValue(), "Unexpected task invocation occured when CC is not initialized!");
        Assertions.assertTrue(mutableBoolean2.booleanValue(), "Expected task invocation failed when CC is not initialized!");
    }

    @Test
    public void testSubmitToCcRunnerOrElseCatchesException() throws Exception {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        testDataBalanceEngine.submitToCcRunnerOrElse(() -> {
            throw new Error("Test error!");
        }, () -> {
            mutableBoolean.setValue(true);
        }, this.logger);
        ((Logger) Mockito.verify(this.logger)).error(ArgumentMatchers.anyString(), (Throwable) ArgumentMatchers.any(Throwable.class));
        Assertions.assertFalse(mutableBoolean.booleanValue(), "Unexpected task invoked during an exception");
    }

    @Test
    public void testBrokerAddMetric() throws InterruptedException {
        MetricsRegistry defaultRegistry = KafkaYammerMetrics.defaultRegistry();
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        DataBalancerMetricsRegistry dataBalancerMetricsRegistry = new DataBalancerMetricsRegistry(defaultRegistry, Collections.emptySet());
        dataBalancerMetricsRegistry.clearShortLivedMetrics();
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine(dataBalancerMetricsRegistry);
        Assertions.assertFalse(metricExists(defaultRegistry, "BrokerAddCount"), "Expected metric to not exist because the balance engine was not activated");
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Assertions.assertTrue(metricExists(defaultRegistry, "BrokerAddCount"), "Expected metric to exist because the balance engine was activated");
        verifyMetricValue(defaultRegistry, "BrokerAddCount", 0);
        testDataBalanceEngine.addBrokers(brokersToAdd(10).brokerIds(), "test", AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        verifyMetricValue(defaultRegistry, "BrokerAddCount", 1);
        testDataBalanceEngine.context.closeAndClearState();
        Assertions.assertFalse(metricExists(defaultRegistry, "BrokerAddCount"), "Expected metric to not exist because the balance engine was closed");
    }

    @Test
    public void testEvenClusterLoadStatusDisabled() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = testDataBalanceEngine.getDataBalanceEngineContext();
        testDataBalanceEngine.canAcceptRequests = true;
        ((ConfluentDataBalanceEngineContext) Mockito.doReturn(false).when(dataBalanceEngineContext)).isCruiseControlInitialized();
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ((KafkaConfig) Mockito.doReturn("EMPTY_BROKER").when(kafkaConfig)).getString((String) ArgumentMatchers.eq("confluent.balancer.heal.uneven.load.trigger"));
        Assertions.assertEquals(EvenClusterLoadStatusDescriptionInternal.DISABLED, testDataBalanceEngine.evenClusterLoadStatus(kafkaConfig));
    }

    @Test
    public void testEvenClusterLoadStatusStarting() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = testDataBalanceEngine.getDataBalanceEngineContext();
        testDataBalanceEngine.canAcceptRequests = true;
        ((ConfluentDataBalanceEngineContext) Mockito.doReturn(false).when(dataBalanceEngineContext)).isCruiseControlInitialized();
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ((KafkaConfig) Mockito.doReturn("ANY_UNEVEN_LOAD").when(kafkaConfig)).getString((String) ArgumentMatchers.eq("confluent.balancer.heal.uneven.load.trigger"));
        Assertions.assertEquals(EvenClusterLoadStatusDescriptionInternal.STARTING, testDataBalanceEngine.evenClusterLoadStatus(kafkaConfig));
    }

    @Test
    public void testEvenClusterLoadStatusBalanced() throws InterruptedException {
        EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatusDescriptionInternal = new EvenClusterLoadStatusDescriptionInternal(EvenClusterLoadStatus.BALANCED, 205L, EvenClusterLoadStatus.BALANCING, 105L, (Exception) null);
        SelfHealingEvenClusterLoadStateManager selfHealingEvenClusterLoadStateManager = (SelfHealingEvenClusterLoadStateManager) Mockito.mock(SelfHealingEvenClusterLoadStateManager.class);
        Mockito.when(selfHealingEvenClusterLoadStateManager.evenClusterLoadStatusDescription()).thenReturn(evenClusterLoadStatusDescriptionInternal);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.canAcceptRequests = true;
        ((DataBalanceEngineContext) Mockito.doReturn(selfHealingEvenClusterLoadStateManager).when(testDataBalanceEngine.getDataBalanceEngineContext())).getEvenClusterLoadStateManager();
        Assertions.assertEquals(evenClusterLoadStatusDescriptionInternal, testDataBalanceEngine.evenClusterLoadStatus((KafkaConfig) Mockito.mock(KafkaConfig.class)));
    }

    @Test
    public void testEvenClusterLoadStatusThrowsBalancerOfflineExceptionWhenEngineNotActivated() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        Assertions.assertThrows(BalancerOfflineException.class, () -> {
            testDataBalanceEngine.evenClusterLoadStatus(kafkaConfig);
        });
    }

    private boolean metricExists(MetricsRegistry metricsRegistry, String str) {
        return metricsRegistry.allMetrics().keySet().stream().anyMatch(metricName -> {
            return metricName.getName().equals(str);
        });
    }

    private void verifyMetricValue(MetricsRegistry metricsRegistry, String str, Integer num) {
        Map allMetrics = metricsRegistry.allMetrics();
        MetricName metricName = (MetricName) allMetrics.keySet().stream().filter(metricName2 -> {
            return metricName2.getName().equals(str);
        }).findFirst().get();
        Assertions.assertEquals(1L, allMetrics.keySet().stream().filter(metricName3 -> {
            return metricName3.getName().equals(str);
        }).count());
        Assertions.assertEquals("kafka.databalancer", metricName.getGroup());
        Assertions.assertEquals(num, ((Gauge) allMetrics.get(metricName)).value());
    }
}
