package io.confluent.databalancer;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.brokerremoval.BrokerRemovalCallback;
import com.linkedin.kafka.cruisecontrol.brokerremoval.BrokerRemovalFuture;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import io.confluent.databalancer.startup.CruiseControlStartable;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import kafka.server.KafkaConfig;
import kafka.utils.MockTime;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalInProgressException;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.internal.verification.VerificationModeFactory;
import org.slf4j.Logger;

/* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngineTest.class */
public class ConfluentDataBalanceEngineTest {
    private Time mockTime = new MockTime();

    @Mock
    private KafkaCruiseControl mockCruiseControl;

    @Mock
    private DataBalancerMetricsRegistry mockMetricsRegistry;

    @Mock
    private ApiStatePersistenceStore persistenceStore;

    @Mock
    BalanceOpExecutionCompletionCallback mockExecCompletionCb;

    @Mock
    Logger logger;
    private Map<Integer, BrokerRemovalStateRecord> brokerRemovalStateRecordMap;
    private ExecutorService executorService;

    /* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngineTest$RejectedExecutorService.class */
    public static class RejectedExecutorService extends ThreadPoolExecutor {
        public RejectedExecutorService(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectedExecutionHandler rejectedExecutionHandler) {
            super(i, i2, j, timeUnit, blockingQueue, rejectedExecutionHandler);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
        }
    }

    private static ExecutorService currentThreadExecutorService() {
        return new RejectedExecutorService(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @Before
    public void setUp() {
        MockitoAnnotations.initMocks(this);
        ((KafkaCruiseControl) Mockito.doNothing().when(this.mockCruiseControl)).userTriggeredStopExecution();
    }

    @After
    public void cleanUp() {
        Mockito.reset(new ApiStatePersistenceStore[]{this.persistenceStore});
    }

    private ConfluentDataBalanceEngine getTestDataBalanceEngine() throws InterruptedException {
        this.executorService = (ExecutorService) Mockito.spy(currentThreadExecutorService());
        ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(this.mockMetricsRegistry, this.mockCruiseControl, this.mockTime));
        Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
        this.brokerRemovalStateRecordMap = new ConcurrentHashMap();
        ((ApiStatePersistenceStore) Mockito.doAnswer(invocationOnMock -> {
            BrokerRemovalStateRecord brokerRemovalStateRecord = (BrokerRemovalStateRecord) invocationOnMock.getArguments()[0];
            this.brokerRemovalStateRecordMap.put(Integer.valueOf(brokerRemovalStateRecord.brokerId()), brokerRemovalStateRecord);
            return null;
        }).when(this.persistenceStore)).save((BrokerRemovalStateRecord) ArgumentMatchers.any(BrokerRemovalStateRecord.class), ArgumentMatchers.anyBoolean());
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(this.brokerRemovalStateRecordMap);
        return new ConfluentDataBalanceEngine(this.executorService, confluentDataBalanceEngineContext);
    }

    @Test
    public void testStopCruiseControlNotInitialized() {
        ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(this.mockMetricsRegistry, (KafkaCruiseControl) null, this.mockTime));
        Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
        new ConfluentDataBalanceEngine(currentThreadExecutorService(), confluentDataBalanceEngineContext).onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).shutdown();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry, Mockito.never())).clearShortLivedMetrics();
    }

    @Test
    public void testStopCruiseControlAfterShutdownd() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, VerificationModeFactory.times(1))).shutdown();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry, VerificationModeFactory.times(1))).clearShortLivedMetrics();
        testDataBalanceEngine.onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, VerificationModeFactory.times(1))).shutdown();
    }

    @Test
    public void testStopCruiseControl() throws InterruptedException {
        getTestDataBalanceEngine().stopCruiseControl();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).shutdown();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).userTriggeredStopExecution();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry)).clearShortLivedMetrics();
    }

    @Test
    public void testDeactivation() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.canAcceptRequests = true;
        testDataBalanceEngine.onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).shutdown();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).userTriggeredStopExecution();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry)).clearShortLivedMetrics();
        Assert.assertFalse("DatabalanceEngine is not stopped", testDataBalanceEngine.canAcceptRequests);
    }

    @Test
    public void testStartCruiseControlNoOp() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.doStart((EngineInitializationContext) null);
        Assert.assertSame(testDataBalanceEngine.context.getCruiseControl(), this.mockCruiseControl);
    }

    @Test
    public void testStartCruiseControlPeristenceThrowsError() throws Exception {
        KafkaCruiseControl kafkaCruiseControl = (KafkaCruiseControl) Mockito.mock(KafkaCruiseControl.class);
        CruiseControlStartable cruiseControlStartable = (CruiseControlStartable) Mockito.mock(CruiseControlStartable.class);
        Mockito.when(cruiseControlStartable.startUp((KafkaConfig) ArgumentMatchers.any(KafkaConfig.class), (Semaphore) ArgumentMatchers.any(Semaphore.class))).thenReturn(kafkaCruiseControl);
        EngineInitializationContext engineInitializationContext = new EngineInitializationContext((KafkaConfig) Mockito.mock(KafkaConfig.class), (Map) null, (Function) null);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ConfluentDataBalanceEngineContext dataBalanceEngineContext = testDataBalanceEngine.getDataBalanceEngineContext();
        dataBalanceEngineContext.setCruiseControl((KafkaCruiseControl) null);
        ((ConfluentDataBalanceEngineContext) Mockito.doThrow(new Throwable[]{new RuntimeException("Test exception")}).when(dataBalanceEngineContext)).setPersistenceStore((ApiStatePersistenceStore) ArgumentMatchers.any(ApiStatePersistenceStore.class));
        testDataBalanceEngine.doStart(engineInitializationContext, cruiseControlStartable);
        ((KafkaCruiseControl) Mockito.verify(kafkaCruiseControl)).shutdown();
    }

    private EngineInitializationContext initializationContext(KafkaConfig kafkaConfig) {
        return new EngineInitializationContext(kafkaConfig, Collections.emptyMap(), num -> {
            return new AtomicReference("Test metric");
        });
    }

    @Test
    public void testOnActivation() throws InterruptedException {
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ((ExecutorService) Mockito.doReturn((Object) null).when(this.executorService)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        ((ExecutorService) Mockito.verify(this.executorService)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        Assert.assertTrue("DatabalanceEngine is not started", testDataBalanceEngine.canAcceptRequests);
    }

    @Test
    public void testRemoveBroker() throws Throwable {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(this.mockCruiseControl.removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn(brokerRemovalFuture);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BalanceOpExecutionCompletionCallback.class);
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        testDataBalanceEngine.removeBroker(1, of, "uid");
        Assert.assertTrue("DatabalanceEngine is not started", testDataBalanceEngine.canAcceptRequests);
        ((ExecutorService) Mockito.verify(this.executorService, VerificationModeFactory.times(2))).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) forClass.capture(), (BrokerRemovalCallback) Mockito.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString());
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).putBrokerRemovalFuture(1, brokerRemovalFuture);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).execute(Duration.ofMinutes(60L));
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context, Mockito.never())).removeBrokerRemovalFuture(1);
        ((BalanceOpExecutionCompletionCallback) forClass.getValue()).accept(true, (Throwable) null);
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).removeBrokerRemovalFuture(1);
        Assert.assertEquals("Expected to have a persisted broker removal state", 1L, this.brokerRemovalStateRecordMap.size());
        Assert.assertEquals(brokerRemovalStateRecord, this.brokerRemovalStateRecordMap.get(1));
        Assert.assertEquals("Expected to have one broker removal state tracker in the context object", 1L, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size());
    }

    @Test
    public void testBrokerRemovalTerminationListener() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.context.brokerRemovalsStateTrackers.put(1, Mockito.mock(BrokerRemovalStateTracker.class));
        testDataBalanceEngine.removalTerminationListener.onTerminalState(1, (BrokerRemovalStateMachine.BrokerRemovalState) null, (Exception) null);
        Assert.assertEquals("Expected to have removed the broker removal state tracker in the context object", 0L, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size());
    }

    @Test
    public void testRemoveBroker_InitiatesRemovalOnPreviouslyRemovedBroker() throws Throwable {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(this.mockCruiseControl.removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn(brokerRemovalFuture);
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(1)).thenReturn(new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null));
        testDataBalanceEngine.removeBroker(1, of, "uid");
        ((ExecutorService) Mockito.verify(this.executorService, VerificationModeFactory.times(2))).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString());
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).putBrokerRemovalFuture(1, brokerRemovalFuture);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).execute(Duration.ofMinutes(60L));
        Assert.assertTrue("DatabalanceEngine is not started", testDataBalanceEngine.canAcceptRequests);
    }

    @Test(expected = BrokerRemovalInProgressException.class)
    public void testRemoveBrokerTwiceFailure() throws InterruptedException {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(1)).thenReturn(new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null));
        testDataBalanceEngine.removeBroker(1, of, "uid");
    }

    @Test(expected = BrokerRemovalInProgressException.class)
    public void testRemoveBrokerFailureForExistingRemovals() throws InterruptedException {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1 + 1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(1)).thenReturn((Object) null);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(Integer.valueOf(brokerRemovalStateRecord.brokerId()), brokerRemovalStateRecord));
        testDataBalanceEngine.removeBroker(1, of, "uid");
    }

    @Test(expected = BalancerOfflineException.class)
    public void testRemoveBrokerThrowsBalancerOfflineExceptionIfNoActiveDatabalancer() throws InterruptedException {
        getTestDataBalanceEngine().removeBroker(1, Optional.of(1L), "uid");
    }

    @Test
    public void testCancelBrokerRemovals_CallsFutureCancel() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(Boolean.valueOf(brokerRemovalFuture.cancel())).thenReturn(true);
        BrokerRemovalFuture brokerRemovalFuture2 = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(Boolean.valueOf(brokerRemovalFuture2.cancel())).thenReturn(false);
        testDataBalanceEngine.context.putBrokerRemovalFuture(1, brokerRemovalFuture);
        testDataBalanceEngine.context.putBrokerRemovalFuture(2, brokerRemovalFuture2);
        boolean cancelBrokerRemoval = testDataBalanceEngine.cancelBrokerRemoval(1);
        boolean cancelBrokerRemoval2 = testDataBalanceEngine.cancelBrokerRemoval(2);
        boolean cancelBrokerRemoval3 = testDataBalanceEngine.cancelBrokerRemoval(3);
        Assert.assertTrue("Expected broker 1 to be cancelled successfully", cancelBrokerRemoval);
        Assert.assertFalse("Expected broker 2 to not be cancelled successfully", cancelBrokerRemoval2);
        Assert.assertFalse("Expected broker 3 to not be cancelled successfully because it doesn't exist", cancelBrokerRemoval3);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).cancel();
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture2)).cancel();
    }

    @Test
    public void testUpdateThrottleWhileRunning() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.updateThrottle(100L);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).updateThrottle(100L);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateThrottleHelper(100L);
    }

    @Test
    public void testUpdateThrottleWhileStopped() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onDeactivation();
        confluentDataBalanceEngine.updateThrottle(100L);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).updateThrottle(ArgumentMatchers.anyLong());
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateThrottle(100L);
    }

    @Test
    public void testUpdateAutoHeal() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.setAutoHealMode(true);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).setGoalViolationSelfHealing(true);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateAutoHealHelper(true);
        confluentDataBalanceEngine.setAutoHealMode(false);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).setGoalViolationSelfHealing(false);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateAutoHealHelper(false);
    }

    @Test
    public void testUpdateAutoHealWhenStopped() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onDeactivation();
        confluentDataBalanceEngine.setAutoHealMode(true);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).setGoalViolationSelfHealing(true);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).setAutoHealMode(true);
    }

    @Test
    public void testAddBroker_success() throws KafkaCruiseControlException, InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).doAddBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
    }

    @Test
    public void testAddBroker_pendingRemove() throws InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerRemovalStateRecord);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine, Mockito.never())).doAddBrokers((Set) ArgumentMatchers.any(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
    }

    @Test
    public void testAddBroker_successfulRemove() throws KafkaCruiseControlException, InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerRemovalStateRecord);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).doAddBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
    }

    @Test
    public void testAddBroker_failedRemove() throws KafkaCruiseControlException, InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_CANCELED, (Exception) null);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerRemovalStateRecord);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).doAddBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
    }

    @Test
    public void testRemoveBrokerStateTrackerContinuouslyUpdatesStatus() throws Throwable {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.mockCruiseControl.removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn((BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        testDataBalanceEngine.removeBroker(1, of, "uid");
        Assert.assertEquals("Expected to have a persisted broker removal state", 1L, this.brokerRemovalStateRecordMap.size());
        Assert.assertEquals(brokerRemovalStateRecord, this.brokerRemovalStateRecordMap.get(1));
        Assert.assertEquals("Expected to have one broker removal state tracker in the context object", 1L, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size());
        BrokerRemovalStateTracker brokerRemovalStateTracker = (BrokerRemovalStateTracker) testDataBalanceEngine.context.brokerRemovalsStateTrackers.get(1);
        brokerRemovalStateTracker.registerEvent(BrokerRemovalCallback.BrokerRemovalEvent.INITIAL_PLAN_COMPUTATION_SUCCESS);
        brokerRemovalStateTracker.registerEvent(BrokerRemovalCallback.BrokerRemovalEvent.BROKER_SHUTDOWN_SUCCESS);
        BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_INITIATED, (Exception) null);
        BrokerRemovalStateRecord brokerRemovalStateRecord3 = this.brokerRemovalStateRecordMap.get(1);
        Assert.assertEquals(brokerRemovalStateRecord2.brokerShutdownStatus(), brokerRemovalStateRecord3.brokerShutdownStatus());
        Assert.assertEquals(brokerRemovalStateRecord2.partitionReassignmentsStatus(), brokerRemovalStateRecord3.partitionReassignmentsStatus());
        Assert.assertEquals(brokerRemovalStateRecord2.exception(), brokerRemovalStateRecord3.exception());
        Assert.assertEquals(brokerRemovalStateRecord2.brokerId(), brokerRemovalStateRecord3.brokerId());
    }

    @Test
    public void testSubmitToCcRunner() throws Exception {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        testDataBalanceEngine.submitToCcRunner(() -> {
            mutableBoolean.setValue(true);
        }, "", this.logger);
        Assert.assertTrue("Task was not invoked", mutableBoolean.booleanValue());
    }

    @Test
    public void testSubmitToCcRunnerCCNotInitialized() throws Exception {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.context.setCruiseControl((KafkaCruiseControl) null);
        MutableBoolean mutableBoolean = new MutableBoolean(false);
        testDataBalanceEngine.submitToCcRunner(() -> {
            mutableBoolean.setValue(true);
        }, "", this.logger);
        Assert.assertFalse("Task was invoked even though CC is not initialized.", mutableBoolean.booleanValue());
    }

    @Test
    public void testSubmitToCcRunnerCatchesException() throws Exception {
        getTestDataBalanceEngine().submitToCcRunner(() -> {
            throw new Error("Test error");
        }, "", this.logger);
        ((Logger) Mockito.verify(this.logger)).error(ArgumentMatchers.anyString(), (Throwable) ArgumentMatchers.any(Throwable.class));
    }
}
