package io.confluent.databalancer;

import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BrokerRemovalCancellationMode;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.controller.DataBalanceManager;
import kafka.metrics.KafkaYammerMetrics;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalCanceledException;
import org.apache.kafka.common.utils.Time;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import scala.Option;

/* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManagerTest.class */
public class KafkaDataBalanceManagerTest {
    private Properties brokerProps;
    private KafkaConfig initConfig;
    private KafkaConfig updatedConfig;
    private DataBalanceManager dataBalancer;

    @Mock
    private KafkaDataBalanceManager.DataBalanceEngineFactory mockDataBalanceEngineFactory;

    @Mock
    private DataBalanceEngine mockActiveDataBalanceEngine;

    @Mock
    private DataBalanceEngine mockInactiveDataBalanceEngine;

    @Mock
    private DataBalancerMetricsRegistry mockDbMetrics;

    @Mock
    private Time time;

    @Mock
    private ApiStatePersistenceStore mockPersistenceStore;

    @Mock
    private DataBalanceEngineContext mockDbeContext;

    @Captor
    private ArgumentCaptor<BalanceOpExecutionCompletionCallback> execCbCaptor;

    @Captor
    private ArgumentCaptor<EngineInitializationContext> initializationContext;
    private Map<Integer, BrokerRemovalStateTracker> stateTrackers;

    @Before
    public void setUp() {
        this.brokerProps = new Properties();
        this.brokerProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), TestUtils$.MODULE$.MockZkConnect());
        this.brokerProps.put("confluent.balancer.enable", true);
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 200L);
        this.initConfig = new KafkaConfig(this.brokerProps);
        this.stateTrackers = new HashMap();
        MockitoAnnotations.initMocks(this);
        Mockito.when(this.mockDataBalanceEngineFactory.getActiveDataBalanceEngine()).thenReturn(this.mockActiveDataBalanceEngine);
        Mockito.when(this.mockDataBalanceEngineFactory.getInactiveDataBalanceEngine()).thenReturn(this.mockInactiveDataBalanceEngine);
        setupMockDbe();
    }

    private void setupMockDbe() {
        Mockito.when(Boolean.valueOf(this.mockActiveDataBalanceEngine.isActive())).thenReturn(true);
        Mockito.when(this.mockActiveDataBalanceEngine.getDataBalanceEngineContext()).thenReturn(this.mockDbeContext);
        Mockito.when(this.mockDbeContext.getPersistenceStore()).thenReturn(this.mockPersistenceStore);
        Mockito.when(this.mockDbeContext.getBrokerRemovalsStateTrackers()).thenReturn(this.stateTrackers);
    }

    @Test
    public void testUpdateConfigBalancerEnable() throws InterruptedException {
        this.brokerProps.put("confluent.balancer.enable", false);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onDeactivation();
        Mockito.reset(new DataBalanceEngine[]{this.mockActiveDataBalanceEngine});
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig.toString(), this.initConfig, kafkaConfig);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) ArgumentMatchers.any(EngineInitializationContext.class));
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).shutdown();
    }

    @Test
    public void testUpdateConfigBalancerEnableOnNonEligibleNode() throws InterruptedException {
        this.brokerProps.put("confluent.balancer.enable", false);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time);
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).onDeactivation();
        Mockito.reset(new DataBalanceEngine[]{this.mockActiveDataBalanceEngine});
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig.toString(), this.initConfig, kafkaConfig);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) ArgumentMatchers.any(EngineInitializationContext.class));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onDeactivation();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).shutdown();
    }

    @Test
    public void testUpdateConfigBalancerThrottle() {
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 100L);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig.toString(), this.initConfig, kafkaConfig);
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(100L);
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(200L);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testUpdateConfigAutoHealMode() {
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig.toString(), this.initConfig, kafkaConfig);
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(true);
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(false);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testUpdateConfigMultipleProperties() {
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 100L);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig.toString(), this.initConfig, kafkaConfig);
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(true);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(100L);
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(false);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(200L);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testUpdateConfigNoPropsUpdated() {
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig.toString(), this.initConfig, kafkaConfig);
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testEnableFromOff() {
        this.brokerProps.put("confluent.balancer.enable", false);
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(kafkaConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) this.initializationContext.capture());
        this.dataBalancer.updateConfig(kafkaConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig2 = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig2.toString(), this.initConfig, kafkaConfig2);
    }

    @Test
    public void testConfluentBalancerEnabledMetric() {
        MetricsRegistry defaultRegistry = KafkaYammerMetrics.defaultRegistry();
        cleanMetrics(defaultRegistry);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, new DataBalancerMetricsRegistry(defaultRegistry, KafkaDataBalanceManager.getMetricsWhiteList()), this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 1);
        this.dataBalancer.onResignation();
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 0);
        cleanMetrics(defaultRegistry);
    }

    private void verifyMetricValue(MetricsRegistry metricsRegistry, String str, Integer num) {
        Map allMetrics = metricsRegistry.allMetrics();
        MetricName metricName = (MetricName) allMetrics.keySet().stream().filter(metricName2 -> {
            return metricName2.getName().equals(str);
        }).findFirst().get();
        Assert.assertEquals(1L, allMetrics.keySet().stream().filter(metricName3 -> {
            return metricName3.getName().equals(str);
        }).count());
        Assert.assertEquals("kafka.databalancer", metricName.getGroup());
        Assert.assertEquals(num, ((Gauge) allMetrics.get(metricName)).value());
    }

    public void cleanMetrics(MetricsRegistry metricsRegistry) {
        TestUtils.clearYammerMetrics();
        metricsRegistry.shutdown();
    }

    @Test
    public void testShutdownOnActive() throws InterruptedException {
        this.brokerProps.put("confluent.balancer.enable", true);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assert.assertEquals(kafkaConfig.toString(), this.initConfig, kafkaConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) ArgumentMatchers.any(EngineInitializationContext.class));
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation();
        this.dataBalancer.onResignation();
        this.dataBalancer.shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).shutdown();
    }

    @Test
    public void testShutdownOnInactive() throws InterruptedException {
        this.brokerProps.put("confluent.balancer.enable", true);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) this.initializationContext.capture());
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation();
        this.dataBalancer.shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onDeactivation();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).shutdown();
    }

    @Test(expected = BalancerOfflineException.class)
    public void testRemoveBroker_NotActiveThrowsBalancerOfflineException() {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        this.dataBalancer.scheduleBrokerRemoval(2, Option.apply(25L));
    }

    @Test
    public void testRemoveBrokerAccepted() {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        actAndAssertRemoveBrokerCalled(1);
    }

    private void actAndAssertRemoveBrokerCalled(int i) {
        this.dataBalancer.onElection(Collections.emptyMap());
        Optional of = Optional.of(15L);
        this.dataBalancer.scheduleBrokerRemoval(i, Option.apply(15L));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).removeBroker(ArgumentMatchers.eq(i), (Optional) ArgumentMatchers.eq(of), (String) ArgumentMatchers.any(String.class));
    }

    @Test
    public void testRemoveNotAliveBroker() {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        this.dataBalancer.onElection(Collections.emptyMap());
        Optional empty = Optional.empty();
        this.dataBalancer.scheduleBrokerRemoval(1, Option.empty());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(empty), (String) ArgumentMatchers.any(String.class));
    }

    @Test
    public void testBrokerRemovals() throws Exception {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.balanceEngine = this.mockActiveDataBalanceEngine;
        Assert.assertEquals(Collections.emptyList(), kafkaDataBalanceManager.brokerRemovals());
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfOneSucceeds() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) this.execCbCaptor.capture(), ArgumentMatchers.anyString());
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet, kafkaDataBalanceManager.brokersToAdd);
        ((BalanceOpExecutionCompletionCallback) this.execCbCaptor.getValue()).accept(true, (Throwable) null);
        Assert.assertTrue("Expected brokersToAdd to be cleared", kafkaDataBalanceManager.brokersToAdd.isEmpty());
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfOnePauses() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) this.execCbCaptor.capture(), ArgumentMatchers.anyString());
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet, kafkaDataBalanceManager.brokersToAdd);
        ((BalanceOpExecutionCompletionCallback) this.execCbCaptor.getValue()).accept(false, (Throwable) null);
        Assert.assertEquals("Expected brokersToAdd to not get cleared after unsuccessful completion", hashSet, kafkaDataBalanceManager.brokersToAdd);
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfOneThrowsException() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) this.execCbCaptor.capture(), ArgumentMatchers.anyString());
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet, kafkaDataBalanceManager.brokersToAdd);
        ((BalanceOpExecutionCompletionCallback) this.execCbCaptor.getValue()).accept(false, new KafkaCruiseControlException("boom"));
        Assert.assertTrue("Expected brokersToAdd to be cleared", kafkaDataBalanceManager.brokersToAdd.isEmpty());
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfMultipleSucceeds() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet);
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet, kafkaDataBalanceManager.brokersToAdd);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), ArgumentMatchers.anyString());
        Set set = (Set) Stream.of((Object[]) new Integer[]{11, 12}).collect(Collectors.toSet());
        kafkaDataBalanceManager.onBrokersStartup(set, set);
        HashSet hashSet2 = new HashSet(hashSet);
        hashSet2.addAll(set);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet2), (BalanceOpExecutionCompletionCallback) this.execCbCaptor.capture(), ArgumentMatchers.anyString());
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet2, kafkaDataBalanceManager.brokersToAdd);
        ((BalanceOpExecutionCompletionCallback) this.execCbCaptor.getValue()).accept(true, (Throwable) null);
        Assert.assertTrue("Expected brokersToAdd to be cleared", kafkaDataBalanceManager.brokersToAdd.isEmpty());
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfMultipleSucceedsWithRace() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet);
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet, kafkaDataBalanceManager.brokersToAdd);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) this.execCbCaptor.capture(), ArgumentMatchers.anyString());
        Set set = (Set) Stream.of((Object[]) new Integer[]{11, 12}).collect(Collectors.toSet());
        kafkaDataBalanceManager.onBrokersStartup(set, set);
        HashSet hashSet2 = new HashSet(hashSet);
        hashSet2.addAll(set);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet2), (BalanceOpExecutionCompletionCallback) this.execCbCaptor.capture(), ArgumentMatchers.anyString());
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet2, kafkaDataBalanceManager.brokersToAdd);
        ((BalanceOpExecutionCompletionCallback) this.execCbCaptor.getAllValues().get(0)).accept(true, (Throwable) null);
        Assert.assertEquals("Expected second brokers to still be present", set, kafkaDataBalanceManager.brokersToAdd);
        ((BalanceOpExecutionCompletionCallback) this.execCbCaptor.getAllValues().get(1)).accept(true, (Throwable) null);
        Assert.assertTrue("Expected brokersToAdd to be cleared", kafkaDataBalanceManager.brokersToAdd.isEmpty());
    }

    @Test
    public void testOnBrokersStartup_DoesntTakeActionIfEngineInactive() {
        HashSet hashSet = new HashSet();
        hashSet.add(1);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(1);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        BrokerRemovalStateTracker brokerRemovalStateTracker = (BrokerRemovalStateTracker) Mockito.mock(BrokerRemovalStateTracker.class);
        Mockito.when(Boolean.valueOf(this.mockActiveDataBalanceEngine.isActive())).thenReturn(false);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet2);
        Mockito.verifyNoInteractions(new Object[]{brokerRemovalStateTracker});
    }

    @Test
    public void testOnBrokersStartup_CancelsRemovalWhenShutdownStatePassed() {
        HashSet hashSet = new HashSet();
        hashSet.add(1);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(1);
        hashSet2.add(2);
        hashSet2.add(3);
        BrokerRemovalStateTracker brokerRemovalStateTracker = (BrokerRemovalStateTracker) Mockito.mock(BrokerRemovalStateTracker.class);
        Mockito.when(Boolean.valueOf(brokerRemovalStateTracker.cancel((Exception) ArgumentMatchers.any(), (BrokerRemovalCancellationMode) ArgumentMatchers.eq(BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION)))).thenReturn(true);
        Mockito.when(Integer.valueOf(brokerRemovalStateTracker.brokerId())).thenReturn(1);
        BrokerRemovalStateTracker brokerRemovalStateTracker2 = (BrokerRemovalStateTracker) Mockito.mock(BrokerRemovalStateTracker.class);
        Mockito.when(Integer.valueOf(brokerRemovalStateTracker2.brokerId())).thenReturn(3);
        Mockito.when(Boolean.valueOf(brokerRemovalStateTracker2.cancel((Exception) ArgumentMatchers.any(), (BrokerRemovalCancellationMode) ArgumentMatchers.eq(BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION)))).thenReturn(false);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        Map brokerRemovalsStateTrackers = kafkaDataBalanceManager.balanceEngine.getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        brokerRemovalsStateTrackers.put(1, brokerRemovalStateTracker);
        brokerRemovalsStateTrackers.put(3, brokerRemovalStateTracker2);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet2);
        ((BrokerRemovalStateTracker) Mockito.verify(brokerRemovalStateTracker)).cancel((Exception) ArgumentMatchers.any(BrokerRemovalCanceledException.class), (BrokerRemovalCancellationMode) ArgumentMatchers.eq(BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION));
        ((BrokerRemovalStateTracker) Mockito.verify(brokerRemovalStateTracker2)).cancel((Exception) ArgumentMatchers.any(BrokerRemovalCanceledException.class), (BrokerRemovalCancellationMode) ArgumentMatchers.eq(BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).cancelBrokerRemoval(1);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).cancelBrokerRemoval(3);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), ArgumentMatchers.anyString());
        Assert.assertEquals(0L, brokerRemovalsStateTrackers.size());
    }

    @Test
    public void testOnBrokersStartup_DoesntAddNonEmptyBrokers() {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        hashSet2.add(1);
        hashSet2.add(2);
        hashSet2.add(3);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).addBrokers((Set) ArgumentMatchers.any(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), ArgumentMatchers.anyString());
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfMultipleOnlyMergesEmpty() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet);
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet, kafkaDataBalanceManager.brokersToAdd);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), ArgumentMatchers.anyString());
        Mockito.reset(new DataBalanceEngine[]{this.mockActiveDataBalanceEngine});
        setupMockDbe();
        kafkaDataBalanceManager.onBrokersStartup(new HashSet(), (Set) Stream.of((Object[]) new Integer[]{11, 12}).collect(Collectors.toSet()));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).addBrokers((Set) ArgumentMatchers.any(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), ArgumentMatchers.anyString());
        Set set = (Set) Stream.of((Object[]) new Integer[]{13, 14}).collect(Collectors.toSet());
        HashSet hashSet2 = new HashSet();
        hashSet2.add(14);
        kafkaDataBalanceManager.onBrokersStartup(hashSet2, set);
        HashSet hashSet3 = new HashSet(hashSet);
        hashSet3.addAll(hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet3), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), ArgumentMatchers.anyString());
        Assert.assertEquals("New brokers not present in DataBalancer", hashSet3, kafkaDataBalanceManager.brokersToAdd);
    }

    @Test
    public void testAddMetric() {
        MetricsRegistry defaultRegistry = KafkaYammerMetrics.defaultRegistry();
        cleanMetrics(defaultRegistry);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), new DataBalancerMetricsRegistry(defaultRegistry, KafkaDataBalanceManager.getMetricsWhiteList()), this.time);
        kafkaDataBalanceManager.onElection(Collections.emptyMap());
        verifyMetricValue(defaultRegistry, "BrokerAddCount", 0);
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet);
        verifyMetricValue(defaultRegistry, "BrokerAddCount", 1);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) this.execCbCaptor.capture(), ArgumentMatchers.anyString());
        ((BalanceOpExecutionCompletionCallback) this.execCbCaptor.getValue()).accept(true, (Throwable) null);
        verifyMetricValue(defaultRegistry, "BrokerAddCount", 0);
    }
}
