package io.confluent.databalancer;

import com.yammer.metrics.core.Gauge;
import com.yammer.metrics.core.MetricName;
import com.yammer.metrics.core.MetricsRegistry;
import io.confluent.databalancer.EngineInitializationContext;
import io.confluent.databalancer.KafkaDataBalanceManager;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalancerStatusStateMachine;
import io.confluent.databalancer.operation.BalancerStatusTracker;
import io.confluent.databalancer.operation.BrokerAdditionStateMachine;
import io.confluent.databalancer.operation.BrokerAdditionStateManager;
import io.confluent.databalancer.operation.BrokerRemovalCancellationMode;
import io.confluent.databalancer.operation.BrokerRemovalCancellationProposal;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.operation.SelfHealingEvenClusterLoadStateManager;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.common.AliveBrokersSnapshot;
import kafka.common.BrokerAdditionDescriptionInternal;
import kafka.common.BrokerRemovalDescriptionInternal;
import kafka.common.BrokerRemovalRequest;
import kafka.common.EvenClusterLoadStatusDescriptionInternal;
import kafka.controller.DataBalanceManager;
import kafka.metrics.KafkaYammerMetrics;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.TestUtils;
import kafka.utils.TestUtils$;
import org.apache.kafka.clients.admin.EvenClusterLoadStatus;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.BalancerBrokerExcludedForReplicaPlacementException;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalCanceledException;
import org.apache.kafka.common.errors.InvalidBrokerRemovalException;
import org.apache.kafka.common.protocol.BalancerOperationOverriddenException;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/databalancer/KafkaDataBalanceManagerTest.class */
public class KafkaDataBalanceManagerTest {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaDataBalanceManagerTest.class);
    private Properties brokerProps;
    private KafkaConfig initConfig;
    private KafkaConfig disabledConfig;
    private KafkaConfig updatedConfig;
    private DataBalanceManager dataBalancer;
    private AtomicReference<Boolean> mockIsActiveFlag;

    @Mock
    private KafkaDataBalanceManager.DataBalanceEngineFactory mockDataBalanceEngineFactory;

    @Mock
    private DataBalanceEngine mockActiveDataBalanceEngine;

    @Mock
    private DataBalanceEngine mockInactiveDataBalanceEngine;

    @Mock
    private DataBalancerMetricsRegistry mockDbMetrics;

    @Mock
    private Time time;

    @Mock
    private ApiStatePersistenceStore mockPersistenceStore;

    @Mock
    private ConfluentDataBalanceEngineContext mockDbeContext;

    @Mock
    private BalancerStatusTracker mockBalancerStatusTracker;

    @Captor
    private ArgumentCaptor<EngineInitializationContext> initializationContext;
    private Map<ImmutableSet<Integer>, BrokerRemovalStateTracker> stateTrackers;

    @BeforeEach
    public void setUp() {
        this.brokerProps = baseBrokerProps();
        this.disabledConfig = new KafkaConfig(this.brokerProps);
        this.initConfig = initConfig(this.brokerProps);
        this.stateTrackers = new HashMap();
        this.mockIsActiveFlag = new AtomicReference<>(Boolean.FALSE);
        MockitoAnnotations.openMocks(this);
        Mockito.when(this.mockDataBalanceEngineFactory.getActiveDataBalanceEngine()).thenReturn(this.mockActiveDataBalanceEngine);
        Mockito.when(this.mockDataBalanceEngineFactory.getInactiveDataBalanceEngine()).thenReturn(this.mockInactiveDataBalanceEngine);
        setupMockDbe();
    }

    private Properties baseBrokerProps() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.ZkConnectProp(), TestUtils$.MODULE$.MockZkConnect());
        properties.put("confluent.balancer.throttle.bytes.per.second", 200L);
        return properties;
    }

    private KafkaConfig initConfig(Properties properties) {
        properties.put("confluent.balancer.enable", true);
        properties.put("confluent.balancer.task.history.retention.days", 2);
        return new KafkaConfig(properties);
    }

    private void setupMockDbe() {
        ((DataBalanceEngine) Mockito.doAnswer(invocationOnMock -> {
            this.mockIsActiveFlag.set(true);
            this.mockBalancerStatusTracker.registerEvent(BalancerStatusStateMachine.BalancerEvent.BALANCER_ENABLED);
            LOG.info("Marked data balance engine as active");
            return null;
        }).when(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) ArgumentMatchers.any(EngineInitializationContext.class));
        ((DataBalanceEngine) Mockito.doAnswer(invocationOnMock2 -> {
            this.mockIsActiveFlag.set(false);
            LOG.info("Marked data balance engine as inactive");
            return null;
        }).when(this.mockActiveDataBalanceEngine)).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        Mockito.when(Boolean.valueOf(this.mockActiveDataBalanceEngine.isActive())).thenAnswer(invocationOnMock3 -> {
            LOG.info("DBE checking on active state: {}", this.mockIsActiveFlag.get());
            return this.mockIsActiveFlag.get();
        });
        Mockito.when(this.mockActiveDataBalanceEngine.getDataBalanceEngineContext()).thenReturn(this.mockDbeContext);
        Mockito.when(this.mockDbeContext.getPersistenceStore()).thenReturn(this.mockPersistenceStore);
        Mockito.when(this.mockDbeContext.getBrokerRemovalsStateTrackers()).thenReturn(this.stateTrackers);
        Mockito.when(this.mockDbeContext.getBalancerStatusTracker()).thenReturn(this.mockBalancerStatusTracker);
    }

    @Test
    public void testElectionWhenEnabled() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        Assertions.assertEquals(EngineInitializationContext.EngineStartupType.ON_FAILOVER, ((EngineInitializationContext) this.initializationContext.getValue()).howStarted);
    }

    @Test
    public void testUpdateConfigBalancerEnable() throws InterruptedException {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        this.dataBalancer.updateConfig(this.initConfig, this.disabledConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        Mockito.reset(new DataBalanceEngine[]{this.mockActiveDataBalanceEngine});
        this.dataBalancer.updateConfig(this.disabledConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        Assertions.assertEquals(EngineInitializationContext.EngineStartupType.ON_ENABLE, ((EngineInitializationContext) this.initializationContext.getValue()).howStarted);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) ArgumentMatchers.any(EngineInitializationContext.class));
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).shutdown();
    }

    @Test
    public void testUpdateConfigBalancerEnableOnNonEligibleNode() throws InterruptedException {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.updateConfig(this.initConfig, this.disabledConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        Mockito.reset(new DataBalanceEngine[]{this.mockActiveDataBalanceEngine});
        this.dataBalancer.updateConfig(this.disabledConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) ArgumentMatchers.any(EngineInitializationContext.class));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).shutdown();
    }

    @Test
    public void testUpdateConfigBalancerThrottle() {
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 100L);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(100L);
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(200L);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testUpdateConfigAutoHealMode() {
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(true);
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(false);
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testUpdateConfigExcludedTopics() {
        this.brokerProps.put("confluent.balancer.exclude.topic.names", Arrays.asList("topic-1", "topic-2"));
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", Arrays.asList("a", "b"));
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateConfigPermanently("topics.excluded.from.partition.movement", DatabalancerUtils.generateCcTopicExclusionRegex(this.updatedConfig));
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateConfigPermanently("topics.excluded.from.partition.movement", DatabalancerUtils.generateCcTopicExclusionRegex(this.initConfig));
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testUpdateConfigMultipleProperties() {
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 100L);
        this.brokerProps.put("confluent.balancer.exclude.topic.names", Arrays.asList("topic-1", "topic-2"));
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", Arrays.asList("a", "b"));
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(true);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(100L);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateConfigPermanently("topics.excluded.from.partition.movement", DatabalancerUtils.generateCcTopicExclusionRegex(this.updatedConfig));
        this.dataBalancer.updateConfig(this.updatedConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).setAutoHealMode(false);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateThrottle(200L);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).updateConfigPermanently("topics.excluded.from.partition.movement", DatabalancerUtils.generateCcTopicExclusionRegex(this.initConfig));
        this.dataBalancer.updateConfig(this.initConfig, this.initConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testUpdateConfigNoPropsUpdated() {
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        this.dataBalancer.updateConfig(this.initConfig, this.updatedConfig);
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testEnableFromOff() {
        this.brokerProps.put("confluent.balancer.enable", false);
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(kafkaConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) this.initializationContext.capture());
        this.dataBalancer.updateConfig(kafkaConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig2 = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig2, kafkaConfig2.toString());
        Assertions.assertEquals(EngineInitializationContext.EngineStartupType.ON_ENABLE, ((EngineInitializationContext) this.initializationContext.getValue()).howStarted);
    }

    @Test
    public void testConfluentBalancerEnabledMetric() {
        MetricsRegistry defaultRegistry = KafkaYammerMetrics.defaultRegistry();
        cleanMetrics(defaultRegistry);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, new DataBalancerMetricsRegistry(defaultRegistry, KafkaDataBalanceManager.getMetricsAllowList()), this.time, this.mockBalancerStatusTracker);
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 0);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 1);
        this.dataBalancer.onResignation();
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 0);
        cleanMetrics(defaultRegistry);
    }

    @Test
    public void testConfluentBalancerEnableMetricNotPresentOnDisabled() {
        MetricsRegistry defaultRegistry = KafkaYammerMetrics.defaultRegistry();
        cleanMetrics(defaultRegistry);
        this.dataBalancer = new KafkaDataBalanceManager(this.disabledConfig, this.mockDataBalanceEngineFactory, new DataBalancerMetricsRegistry(defaultRegistry, KafkaDataBalanceManager.getMetricsAllowList()), this.time, this.mockBalancerStatusTracker);
        verifyMetricNotPresent(defaultRegistry, "ActiveBalancerCount");
    }

    @Test
    public void testConfluentBalancerMetricDisableOnIdleNode() {
        MetricsRegistry defaultRegistry = KafkaYammerMetrics.defaultRegistry();
        cleanMetrics(defaultRegistry);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, new DataBalancerMetricsRegistry(defaultRegistry, KafkaDataBalanceManager.getMetricsAllowList()), this.time, this.mockBalancerStatusTracker);
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 0);
        this.dataBalancer.updateConfig(this.initConfig, this.disabledConfig);
        verifyMetricNotPresent(defaultRegistry, "ActiveBalancerCount");
        cleanMetrics(defaultRegistry);
    }

    @Test
    public void testConfluentBalancerMetricDisableOnActiveNode() {
        MetricsRegistry defaultRegistry = KafkaYammerMetrics.defaultRegistry();
        cleanMetrics(defaultRegistry);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, new DataBalancerMetricsRegistry(defaultRegistry, KafkaDataBalanceManager.getMetricsAllowList()), this.time, this.mockBalancerStatusTracker);
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 0);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 1);
        this.dataBalancer.updateConfig(this.initConfig, this.disabledConfig);
        verifyMetricNotPresent(defaultRegistry, "ActiveBalancerCount");
        this.dataBalancer.updateConfig(this.disabledConfig, this.initConfig);
        verifyMetricValue(defaultRegistry, "ActiveBalancerCount", 1);
        cleanMetrics(defaultRegistry);
    }

    private void verifyMetricValue(MetricsRegistry metricsRegistry, String str, Integer num) {
        Map allMetrics = metricsRegistry.allMetrics();
        MetricName metricName = (MetricName) allMetrics.keySet().stream().filter(metricName2 -> {
            return metricName2.getName().equals(str);
        }).findFirst().get();
        Assertions.assertEquals(1L, allMetrics.keySet().stream().filter(metricName3 -> {
            return metricName3.getName().equals(str);
        }).count());
        Assertions.assertEquals("kafka.databalancer", metricName.getGroup());
        Assertions.assertEquals(num, ((Gauge) allMetrics.get(metricName)).value());
    }

    private void verifyMetricNotPresent(MetricsRegistry metricsRegistry, String str) {
        Assertions.assertFalse(metricsRegistry.allMetrics().keySet().stream().anyMatch(metricName -> {
            return metricName.getName().equals(str);
        }), String.format("Metric %s present when not expected", str));
    }

    public void cleanMetrics(MetricsRegistry metricsRegistry) {
        TestUtils.clearYammerMetrics();
        metricsRegistry.shutdown();
    }

    @Test
    public void testShutdownOnActive() throws InterruptedException {
        this.brokerProps.put("confluent.balancer.enable", true);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onActivation((EngineInitializationContext) this.initializationContext.capture());
        KafkaConfig kafkaConfig = ((EngineInitializationContext) this.initializationContext.getValue()).kafkaConfig;
        Assertions.assertEquals(this.initConfig, kafkaConfig, kafkaConfig.toString());
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) ArgumentMatchers.any(EngineInitializationContext.class));
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        this.dataBalancer.onResignation();
        this.dataBalancer.shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).shutdown();
    }

    @Test
    public void testShutdownOnInactive() throws InterruptedException {
        this.brokerProps.put("confluent.balancer.enable", true);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onActivation((EngineInitializationContext) this.initializationContext.capture());
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        this.dataBalancer.shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine, Mockito.never())).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).shutdown();
        ((DataBalanceEngine) Mockito.verify(this.mockInactiveDataBalanceEngine)).shutdown();
    }

    private BrokerRemovalRequest removalRequest(boolean z, Integer... numArr) {
        return new BrokerRemovalRequest((List) Arrays.stream(numArr).collect(Collectors.toList()), Collections.emptyList(), Boolean.valueOf(z));
    }

    @Test
    public void testRemoveBroker_NotActiveThrowsBalancerOfflineException() {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        Assertions.assertThrows(BalancerOfflineException.class, () -> {
            this.dataBalancer.scheduleBrokerRemoval(removalRequest(false, 2), AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        });
    }

    @Test
    public void testRemoveBrokers_ExtraActiveExclusionsThrowsException() {
        HashSet hashSet = new HashSet();
        hashSet.add(1);
        hashSet.add(2);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertThrows(BalancerBrokerExcludedForReplicaPlacementException.class, () -> {
            this.dataBalancer.scheduleBrokerRemoval(removalRequest(false, 2), new AliveBrokersSnapshot(Collections.emptyMap(), hashSet));
        });
    }

    @Test
    public void testRemoveBroker_ActiveExclusionSubsetOfBrokersToRemoveDoesNotThrowException() {
        HashMap hashMap = new HashMap();
        hashMap.put(1, 15L);
        hashMap.put(2, 20L);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        this.dataBalancer.scheduleBrokerRemoval(removalRequest(false, 1, 2), new AliveBrokersSnapshot(hashMap, Collections.singleton(1)));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).removeBrokers((Map) ArgumentMatchers.eq((Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Optional.of(entry.getValue());
        }))), ArgumentMatchers.eq(false), (String) ArgumentMatchers.any(String.class));
    }

    @Test
    public void testRemoveBroker_FiltersOutNonExistentBrokersWhoWereRemoved() {
        HashMap hashMap = new HashMap();
        hashMap.put(1, 15L);
        hashMap.put(2, 20L);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        BrokerRemovalRequest brokerRemovalRequest = new BrokerRemovalRequest(Arrays.asList(1, 2), Collections.singletonList(3), false);
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(new HashSet(Collections.singletonList(3)), BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, false);
        brokerRemovalStateRecord.setLastUpdateTime(this.time.hiResClockMs());
        Mockito.when(this.mockPersistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(brokerRemovalStateRecord.brokerIds(), brokerRemovalStateRecord));
        this.dataBalancer.scheduleBrokerRemoval(brokerRemovalRequest, new AliveBrokersSnapshot(hashMap, Collections.emptySet()));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).removeBrokers((Map) ArgumentMatchers.eq((Map) hashMap.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return Optional.of(entry.getValue());
        }))), ArgumentMatchers.eq(false), (String) ArgumentMatchers.any(String.class));
    }

    @Test
    public void testRemoveBroker_ThrowsIfNonExistentBrokersWereGiven() {
        HashMap hashMap = new HashMap();
        hashMap.put(1, 15L);
        hashMap.put(2, 20L);
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Mockito.when(this.mockPersistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.emptyMap());
        BrokerRemovalRequest brokerRemovalRequest = new BrokerRemovalRequest(Arrays.asList(1, 2), Collections.singletonList(3), true);
        Assertions.assertThrows(InvalidBrokerRemovalException.class, () -> {
            this.dataBalancer.scheduleBrokerRemoval(brokerRemovalRequest, new AliveBrokersSnapshot(hashMap, Collections.emptySet()));
        });
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).removeBrokers((Map) ArgumentMatchers.any(), ArgumentMatchers.eq(true), (String) ArgumentMatchers.any());
    }

    @Test
    public void testRemoveBroker_ReturnsSuccessIfAllBrokersWereRemoved() {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(new HashSet(Arrays.asList(1, 2)), BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null, false);
        brokerRemovalStateRecord.setLastUpdateTime(this.time.hiResClockMs());
        Mockito.when(this.mockPersistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(brokerRemovalStateRecord.brokerIds(), brokerRemovalStateRecord));
        this.dataBalancer.scheduleBrokerRemoval(new BrokerRemovalRequest(Collections.emptyList(), Arrays.asList(1, 2), true), new AliveBrokersSnapshot(Collections.emptyMap(), Collections.emptySet()));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).removeBrokers((Map) ArgumentMatchers.any(), ArgumentMatchers.eq(false), (String) ArgumentMatchers.any());
    }

    @Test
    public void testRemoveBrokerAccepted() {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        HashMap hashMap = new HashMap();
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        hashMap.put(1, 15L);
        hashMap.put(2, 20L);
        hashMap.put(3, 25L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(1, Optional.of(15L));
        hashMap2.put(3, Optional.of(25L));
        BrokerAdditionStateManager brokerAdditionStateManager = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager.brokerId())).thenReturn(1);
        Mockito.when(brokerAdditionStateManager.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.COMPLETED);
        Mockito.when(Boolean.valueOf(brokerAdditionStateManager.isAtATerminalState())).thenReturn(true);
        BrokerAdditionStateManager brokerAdditionStateManager2 = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager2.brokerId())).thenReturn(2);
        Mockito.when(brokerAdditionStateManager2.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.REASSIGNMENT);
        Mockito.when(Boolean.valueOf(brokerAdditionStateManager2.isAtATerminalState())).thenReturn(false);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(1, brokerAdditionStateManager);
        hashMap3.put(2, brokerAdditionStateManager2);
        Mockito.when(this.mockDbeContext.getBrokerAdditionsStateManagers()).thenReturn(hashMap3);
        this.dataBalancer.scheduleBrokerRemoval(removalRequest(true, 1, 3), new AliveBrokersSnapshot(hashMap, Collections.emptySet()));
        ((BrokerAdditionStateManager) Mockito.verify(brokerAdditionStateManager)).isAtATerminalState();
        ((BrokerAdditionStateManager) Mockito.verify(brokerAdditionStateManager)).brokerId();
        Mockito.verifyNoMoreInteractions(new Object[]{brokerAdditionStateManager});
        ((BrokerAdditionStateManager) Mockito.verify(brokerAdditionStateManager2)).registerEvent((BrokerAdditionStateMachine.BrokerAdditionEvent) ArgumentMatchers.eq(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_REMOVAL_REQUEST_OVERRIDES), (Exception) ArgumentMatchers.any(BalancerOperationOverriddenException.class));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).removeBrokers((Map) ArgumentMatchers.eq(hashMap2), ArgumentMatchers.eq(true), (String) ArgumentMatchers.any(String.class));
    }

    @Test
    public void testRemoveNotAliveBroker() {
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        this.dataBalancer.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Optional empty = Optional.empty();
        this.dataBalancer.scheduleBrokerRemoval(removalRequest(false, 1), AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).removeBrokers((Map) ArgumentMatchers.eq(Collections.singletonMap(1, empty)), ArgumentMatchers.eq(false), (String) ArgumentMatchers.any(String.class));
    }

    @Test
    public void testBrokerRemovals() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        long hiResClockMs = this.time.hiResClockMs();
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(new HashSet(Arrays.asList(1, 2)), BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null, true);
        brokerRemovalStateRecord.setLastUpdateTime(hiResClockMs);
        BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(new HashSet(Arrays.asList(1, 2, 3, 4, 5)), BrokerRemovalStateMachine.BrokerRemovalState.BROKER_SHUTDOWN_FAILED, (Exception) null, false);
        brokerRemovalStateRecord2.setLastUpdateTime(hiResClockMs - 2);
        BrokerRemovalStateRecord brokerRemovalStateRecord3 = new BrokerRemovalStateRecord(new HashSet(Arrays.asList(1, 2, 3, 4, 5, 6, 7)), BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_FAILED, (Exception) null, true);
        brokerRemovalStateRecord3.setLastUpdateTime(hiResClockMs - 3);
        HashMap hashMap = new HashMap();
        hashMap.put(new ImmutableSet(brokerRemovalStateRecord3.brokerIds()), brokerRemovalStateRecord3);
        hashMap.put(new ImmutableSet(brokerRemovalStateRecord2.brokerIds()), brokerRemovalStateRecord2);
        hashMap.put(new ImmutableSet(brokerRemovalStateRecord.brokerIds()), brokerRemovalStateRecord);
        Mockito.when(this.mockPersistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        Assertions.assertEquals(7, kafkaDataBalanceManager.brokerRemovals().size());
        Map<Integer, BrokerRemovalDescriptionInternal> map = (Map) kafkaDataBalanceManager.brokerRemovals().stream().collect(Collectors.toMap(brokerRemovalDescriptionInternal -> {
            return Integer.valueOf(brokerRemovalDescriptionInternal.brokerId());
        }, brokerRemovalDescriptionInternal2 -> {
            return brokerRemovalDescriptionInternal2;
        }));
        assertDescriptionMatchesRecord(brokerRemovalStateRecord3, map, 6, 7);
        assertDescriptionMatchesRecord(brokerRemovalStateRecord2, map, 3, 4, 5);
        assertDescriptionMatchesRecord(brokerRemovalStateRecord, map, 1, 2);
    }

    @Test
    public void testRemoveBrokerHistory() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, new SystemTime(), (BalancerStatusTracker) null);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        this.mockIsActiveFlag.set(true);
        BrokerRemovalStateRecord brokerRemovalStateRecord = (BrokerRemovalStateRecord) Mockito.mock(BrokerRemovalStateRecord.class);
        BrokerRemovalDescriptionInternal brokerRemovalDescriptionInternal = (BrokerRemovalDescriptionInternal) Mockito.mock(BrokerRemovalDescriptionInternal.class);
        Mockito.when(brokerRemovalStateRecord.toString()).thenReturn("Test expired removal record");
        Mockito.when(Long.valueOf(brokerRemovalStateRecord.lastUpdateTime())).thenReturn(Long.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(5L)));
        Mockito.when(brokerRemovalStateRecord.toRemovalDescriptions()).thenReturn(Collections.singletonList(brokerRemovalDescriptionInternal));
        BrokerRemovalStateRecord brokerRemovalStateRecord2 = (BrokerRemovalStateRecord) Mockito.mock(BrokerRemovalStateRecord.class);
        BrokerRemovalDescriptionInternal brokerRemovalDescriptionInternal2 = (BrokerRemovalDescriptionInternal) Mockito.mock(BrokerRemovalDescriptionInternal.class);
        Mockito.when(brokerRemovalStateRecord2.toString()).thenReturn("Test removal record");
        Mockito.when(Long.valueOf(brokerRemovalStateRecord2.lastUpdateTime())).thenReturn(Long.valueOf(System.currentTimeMillis() - TimeUnit.DAYS.toMillis(1L)));
        Mockito.when(brokerRemovalStateRecord2.toRemovalDescriptions()).thenReturn(Collections.singletonList(brokerRemovalDescriptionInternal2));
        HashMap hashMap = new HashMap(2);
        hashMap.put(new ImmutableSet(Collections.singleton(1)), brokerRemovalStateRecord);
        hashMap.put(new ImmutableSet(Collections.singleton(2)), brokerRemovalStateRecord2);
        Mockito.when(this.mockPersistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        List brokerRemovals = kafkaDataBalanceManager.brokerRemovals();
        Assertions.assertEquals(1, brokerRemovals.size());
        Assertions.assertEquals(brokerRemovalDescriptionInternal2, brokerRemovals.get(0));
    }

    private void assertDescriptionMatchesRecord(BrokerRemovalStateRecord brokerRemovalStateRecord, Map<Integer, BrokerRemovalDescriptionInternal> map, Integer... numArr) {
        for (Integer num : numArr) {
            Assertions.assertTrue(map.containsKey(num), String.format("expected the received descriptions to contain broker %s but they did not. Descriptions: %s", num, map));
            BrokerRemovalDescriptionInternal brokerRemovalDescriptionInternal = map.get(num);
            Assertions.assertTrue(brokerRemovalStateRecord.brokerIds().contains(num), String.format("The expected record did not contain the broker id %s - it contained %s", num, brokerRemovalStateRecord.brokerIds()));
            Assertions.assertEquals(brokerRemovalStateRecord.brokerShutdownStatus(), brokerRemovalDescriptionInternal.brokerShutdownStatus());
            Assertions.assertEquals(brokerRemovalStateRecord.replicaExclusionStatus(), brokerRemovalDescriptionInternal.replicaExclusionStatus());
            Assertions.assertEquals(brokerRemovalStateRecord.partitionReassignmentsStatus(), brokerRemovalDescriptionInternal.partitionReassignmentsStatus());
            Assertions.assertEquals(brokerRemovalStateRecord.exception(), brokerRemovalDescriptionInternal.exception());
            Assertions.assertEquals(brokerRemovalStateRecord.startTime(), brokerRemovalDescriptionInternal.createTimeMs());
            Assertions.assertEquals(brokerRemovalStateRecord.lastUpdateTime(), brokerRemovalDescriptionInternal.lastUpdateTimeMs());
            Assertions.assertEquals(Boolean.valueOf(brokerRemovalStateRecord.shouldShutdown()), Boolean.valueOf(brokerRemovalDescriptionInternal.shutdownScheduled()));
        }
    }

    @Test
    public void testBrokerAdditions() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertEquals(Collections.emptyList(), kafkaDataBalanceManager.brokerAdditions());
    }

    @Test
    public void testNewReplicaExclusionCancelsBrokerAddition() {
        KafkaDataBalanceManager.DataBalanceEngineFactory dataBalanceEngineFactory = new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine);
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, dataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        HashSet hashSet2 = new HashSet(Arrays.asList(3, 4));
        BrokerAdditionStateManager brokerAdditionStateManager = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager.brokerId())).thenReturn(1);
        Mockito.when(brokerAdditionStateManager.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.COMPLETED);
        Mockito.when(Boolean.valueOf(brokerAdditionStateManager.isAtATerminalState())).thenReturn(true);
        BrokerAdditionStateManager brokerAdditionStateManager2 = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager2.brokerId())).thenReturn(2);
        Mockito.when(brokerAdditionStateManager2.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.REASSIGNMENT);
        Mockito.when(Boolean.valueOf(brokerAdditionStateManager2.isAtATerminalState())).thenReturn(false);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerAdditionStateManager);
        hashMap.put(2, brokerAdditionStateManager2);
        Mockito.when(this.mockDbeContext.getBrokerAdditionsStateManagers()).thenReturn(hashMap);
        kafkaDataBalanceManager.onAlteredExclusions(hashSet, hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(ArgumentMatchers.anySet(), (BrokerChangeEvent) ArgumentMatchers.eq(BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet2, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
        ((BrokerAdditionStateManager) Mockito.verify(brokerAdditionStateManager)).isAtATerminalState();
        ((BrokerAdditionStateManager) Mockito.verify(brokerAdditionStateManager)).brokerId();
        Mockito.verifyNoMoreInteractions(new Object[]{brokerAdditionStateManager});
        ((BrokerAdditionStateManager) Mockito.verify(brokerAdditionStateManager2)).registerEvent((BrokerAdditionStateMachine.BrokerAdditionEvent) ArgumentMatchers.eq(BrokerAdditionStateMachine.BrokerAdditionEvent.BROKER_EXCLUSION_DETECTED), (Exception) ArgumentMatchers.any(BalancerOperationOverriddenException.class));
    }

    @Test
    public void testRemovedReplicaExclusionsDoNotCancelBrokerAddition() {
        KafkaDataBalanceManager.DataBalanceEngineFactory dataBalanceEngineFactory = new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine);
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, dataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet(Arrays.asList(3, 4));
        BrokerAdditionStateManager brokerAdditionStateManager = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager.brokerId())).thenReturn(1);
        Mockito.when(brokerAdditionStateManager.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.COMPLETED);
        Mockito.when(Boolean.valueOf(brokerAdditionStateManager.isAtATerminalState())).thenReturn(true);
        BrokerAdditionStateManager brokerAdditionStateManager2 = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager2.brokerId())).thenReturn(2);
        Mockito.when(brokerAdditionStateManager2.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.REASSIGNMENT);
        Mockito.when(Boolean.valueOf(brokerAdditionStateManager2.isAtATerminalState())).thenReturn(false);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerAdditionStateManager);
        hashMap.put(2, brokerAdditionStateManager2);
        Mockito.when(this.mockDbeContext.getBrokerAdditionsStateManagers()).thenReturn(hashMap);
        kafkaDataBalanceManager.onAlteredExclusions(hashSet, hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet2, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).notifyBrokerChange(ArgumentMatchers.anySet(), (BrokerChangeEvent) ArgumentMatchers.eq(BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT));
        Mockito.verifyNoMoreInteractions(new Object[]{brokerAdditionStateManager});
        Mockito.verifyNoMoreInteractions(new Object[]{brokerAdditionStateManager2});
    }

    @Test
    public void testNewReplicaExclusionCancelsBrokerRemoval() {
        BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent = BrokerRemovalStateMachine.BrokerRemovalEvent.EXCLUSION_ADDED;
        BrokerRemovalCancellationMode brokerRemovalCancellationMode = BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION;
        ImmutableSet<Integer> immutableSet = new ImmutableSet<>(Collections.singletonList(1));
        ImmutableSet<Integer> immutableSet2 = new ImmutableSet<>(Collections.singletonList(2));
        ImmutableSet<Integer> immutableSet3 = new ImmutableSet<>(Collections.singletonList(3));
        BrokerRemovalStateTracker mockRemovalStateTracker = mockRemovalStateTracker(immutableSet, brokerRemovalCancellationMode, brokerRemovalEvent, true);
        BrokerRemovalStateTracker mockRemovalStateTracker2 = mockRemovalStateTracker(immutableSet2, brokerRemovalCancellationMode, brokerRemovalEvent, false);
        BrokerRemovalStateTracker mockRemovalStateTracker3 = mockRemovalStateTracker(immutableSet3, brokerRemovalCancellationMode, brokerRemovalEvent, true);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Map brokerRemovalsStateTrackers = kafkaDataBalanceManager.getBalanceEngine().getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        brokerRemovalsStateTrackers.put(immutableSet, mockRemovalStateTracker);
        brokerRemovalsStateTrackers.put(immutableSet2, mockRemovalStateTracker2);
        brokerRemovalsStateTrackers.put(immutableSet3, mockRemovalStateTracker3);
        kafkaDataBalanceManager.onAlteredExclusions(new HashSet(Arrays.asList(1, 2)), new HashSet());
        verifyMockTrackerCancelCall(mockRemovalStateTracker, brokerRemovalEvent, brokerRemovalCancellationMode);
        verifyMockTrackerCancelCall(mockRemovalStateTracker2, brokerRemovalEvent, brokerRemovalCancellationMode);
        verifyMockTrackerCancelCall(mockRemovalStateTracker3, brokerRemovalEvent, brokerRemovalCancellationMode);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).cancelBrokerRemoval(immutableSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).cancelBrokerRemoval(immutableSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).cancelBrokerRemoval(immutableSet3);
    }

    @Test
    public void testRemovedReplicaExclusionsCancelBrokerRemoval() {
        BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent = BrokerRemovalStateMachine.BrokerRemovalEvent.EXCLUSION_REMOVED;
        BrokerRemovalCancellationMode brokerRemovalCancellationMode = BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION;
        ImmutableSet<Integer> immutableSet = new ImmutableSet<>(Collections.singletonList(1));
        ImmutableSet<Integer> immutableSet2 = new ImmutableSet<>(Collections.singletonList(2));
        ImmutableSet<Integer> immutableSet3 = new ImmutableSet<>(Collections.singletonList(3));
        BrokerRemovalStateTracker mockRemovalStateTracker = mockRemovalStateTracker(immutableSet, brokerRemovalCancellationMode, brokerRemovalEvent, true);
        BrokerRemovalStateTracker mockRemovalStateTracker2 = mockRemovalStateTracker(immutableSet2, brokerRemovalCancellationMode, brokerRemovalEvent, false);
        BrokerRemovalStateTracker mockRemovalStateTracker3 = mockRemovalStateTracker(immutableSet3, brokerRemovalCancellationMode, brokerRemovalEvent, true);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Map brokerRemovalsStateTrackers = kafkaDataBalanceManager.getBalanceEngine().getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        brokerRemovalsStateTrackers.put(immutableSet, mockRemovalStateTracker);
        brokerRemovalsStateTrackers.put(immutableSet2, mockRemovalStateTracker2);
        brokerRemovalsStateTrackers.put(immutableSet3, mockRemovalStateTracker3);
        HashSet hashSet = new HashSet(Arrays.asList(3, 4));
        kafkaDataBalanceManager.onAlteredExclusions(new HashSet(), hashSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).notifyBrokerChange(ArgumentMatchers.anySet(), (BrokerChangeEvent) ArgumentMatchers.eq(BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT));
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
        verifyMockTrackerCancelCall(mockRemovalStateTracker, brokerRemovalEvent, brokerRemovalCancellationMode);
        verifyMockTrackerCancelCall(mockRemovalStateTracker2, brokerRemovalEvent, brokerRemovalCancellationMode);
        verifyMockTrackerCancelCall(mockRemovalStateTracker3, brokerRemovalEvent, brokerRemovalCancellationMode);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).cancelBrokerRemoval(immutableSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).cancelBrokerRemoval(immutableSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).cancelBrokerRemoval(immutableSet3);
    }

    @Test
    public void testNewReplicaExclusionWithNoRemovalOrAdditionCancelsOngoingExecution() {
        KafkaDataBalanceManager.DataBalanceEngineFactory dataBalanceEngineFactory = new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine);
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, dataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        HashSet hashSet2 = new HashSet(Arrays.asList(3, 4));
        kafkaDataBalanceManager.onAlteredExclusions(hashSet, hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet, BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet2, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
    }

    public BrokerRemovalStateTracker mockRemovalStateTracker(ImmutableSet<Integer> immutableSet, BrokerRemovalCancellationMode brokerRemovalCancellationMode, BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent, boolean z) {
        BrokerRemovalStateTracker brokerRemovalStateTracker = (BrokerRemovalStateTracker) Mockito.mock(BrokerRemovalStateTracker.class);
        Mockito.when(Boolean.valueOf(brokerRemovalStateTracker.maybeCancel((BrokerRemovalCancellationProposal) ArgumentMatchers.argThat(brokerRemovalCancellationProposal -> {
            return brokerRemovalCancellationProposal.cancellationMode().equals(brokerRemovalCancellationMode) && brokerRemovalCancellationProposal.cancellationEvent().equals(brokerRemovalEvent);
        })))).thenReturn(Boolean.valueOf(z));
        Mockito.when(brokerRemovalStateTracker.brokerIds()).thenReturn(immutableSet);
        Mockito.when(Boolean.valueOf(this.mockActiveDataBalanceEngine.cancelBrokerRemoval((Set) ArgumentMatchers.eq(immutableSet)))).thenReturn(Boolean.valueOf(z));
        BrokerRemovalStateMachine.BrokerRemovalState brokerRemovalState = BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_FAILED;
        if (z) {
            brokerRemovalState = BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_FAILED;
        }
        Mockito.when(brokerRemovalStateTracker.currentState()).thenReturn(brokerRemovalState);
        return brokerRemovalStateTracker;
    }

    private void verifyMockTrackerCancelCall(BrokerRemovalStateTracker brokerRemovalStateTracker, BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent, BrokerRemovalCancellationMode brokerRemovalCancellationMode) {
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BrokerRemovalCancellationProposal.class);
        ((BrokerRemovalStateTracker) Mockito.verify(brokerRemovalStateTracker)).maybeCancel((BrokerRemovalCancellationProposal) forClass.capture());
        Assertions.assertEquals(brokerRemovalCancellationMode, ((BrokerRemovalCancellationProposal) forClass.getValue()).cancellationMode());
        Assertions.assertEquals(brokerRemovalEvent, ((BrokerRemovalCancellationProposal) forClass.getValue()).cancellationEvent());
        Assertions.assertTrue(((BrokerRemovalCancellationProposal) forClass.getValue()).eventException() instanceof BrokerRemovalCanceledException);
    }

    @Test
    public void testReplicaExclusionNotifications() {
        KafkaDataBalanceManager.DataBalanceEngineFactory dataBalanceEngineFactory = new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine);
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, dataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        HashSet hashSet2 = new HashSet(Arrays.asList(3, 4));
        kafkaDataBalanceManager.onAlteredExclusions(hashSet, hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet, BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet2, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
        verifyExclusionNotificationInteractions();
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    private void verifyExclusionNotificationInteractions() {
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.atLeast(1))).getDataBalanceEngineContext();
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.atLeast(1))).isActive();
    }

    @Test
    public void testReplicaExclusionNotificationsNoExclusionRemovals() {
        KafkaDataBalanceManager.DataBalanceEngineFactory dataBalanceEngineFactory = new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine);
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, dataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        HashSet hashSet = new HashSet(Arrays.asList(1, 2));
        kafkaDataBalanceManager.onAlteredExclusions(hashSet, new HashSet());
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet, BrokerChangeEvent.EXCLUDED_FOR_REPLICA_PLACEMENT);
        verifyExclusionNotificationInteractions();
        Mockito.verifyNoMoreInteractions(new Object[]{this.mockActiveDataBalanceEngine});
    }

    @Test
    public void testReplicaExclusionNotificationsNoNewExclusions() {
        KafkaDataBalanceManager.DataBalanceEngineFactory dataBalanceEngineFactory = new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine);
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, dataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet(Arrays.asList(1, 2));
        kafkaDataBalanceManager.onAlteredExclusions(hashSet, hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).notifyBrokerChange(hashSet2, BrokerChangeEvent.REMOVED_REPLICA_EXCLUSION);
    }

    @Test
    public void testBrokerAdditionsTaskHistoryFiltering() {
        KafkaDataBalanceManager.DataBalanceEngineFactory dataBalanceEngineFactory = new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine);
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, dataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        int intValue = this.initConfig.getInt("confluent.balancer.task.history.retention.days").intValue();
        long milliseconds = this.time.milliseconds();
        BrokerAdditionStateManager brokerAdditionStateManager = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager.brokerId())).thenReturn(1);
        Mockito.when(brokerAdditionStateManager.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.COMPLETED);
        Mockito.when(Long.valueOf(brokerAdditionStateManager.creationTimeMs())).thenReturn(Long.valueOf(milliseconds - TimeUnit.DAYS.toMillis(intValue + 2)));
        Mockito.when(Long.valueOf(brokerAdditionStateManager.lastUpdateTimeMs())).thenReturn(Long.valueOf(milliseconds - TimeUnit.DAYS.toMillis(intValue + 1)));
        Mockito.when(brokerAdditionStateManager.exception()).thenReturn(Optional.empty());
        BrokerAdditionStateManager brokerAdditionStateManager2 = (BrokerAdditionStateManager) Mockito.mock(BrokerAdditionStateManager.class);
        Mockito.when(Integer.valueOf(brokerAdditionStateManager2.brokerId())).thenReturn(2);
        Mockito.when(brokerAdditionStateManager2.currentState()).thenReturn(BrokerAdditionStateMachine.BrokerAdditionState.REASSIGNMENT);
        Mockito.when(Long.valueOf(brokerAdditionStateManager2.creationTimeMs())).thenReturn(Long.valueOf(milliseconds - TimeUnit.DAYS.toMillis(intValue)));
        Mockito.when(Long.valueOf(brokerAdditionStateManager2.lastUpdateTimeMs())).thenReturn(Long.valueOf(milliseconds - TimeUnit.DAYS.toMillis(intValue - 1)));
        Mockito.when(brokerAdditionStateManager2.exception()).thenReturn(Optional.empty());
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerAdditionStateManager);
        hashMap.put(2, brokerAdditionStateManager2);
        Mockito.when(this.mockDbeContext.getBrokerAdditionsStateManagers()).thenReturn(hashMap);
        Assertions.assertEquals(1, kafkaDataBalanceManager.brokerAdditions().size());
        Assertions.assertEquals(brokerAdditionStateManager2.brokerId(), ((BrokerAdditionDescriptionInternal) kafkaDataBalanceManager.brokerAdditions().get(0)).brokerId());
    }

    @Test
    public void testBrokerRemovalsTaskHistoryFiltering() {
        this.time = new MockTime(50L, System.currentTimeMillis(), System.nanoTime());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.setBalanceEngine(this.mockActiveDataBalanceEngine);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        int intValue = this.initConfig.getInt("confluent.balancer.task.history.retention.days").intValue();
        long milliseconds = this.time.milliseconds();
        long millis = milliseconds - TimeUnit.DAYS.toMillis(intValue + 1);
        long millis2 = milliseconds - TimeUnit.DAYS.toMillis(intValue - (intValue / 2));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(new HashSet(Arrays.asList(1, 2, 3, 4, 5)), BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null, false);
        brokerRemovalStateRecord.setLastUpdateTime(millis);
        List asList = Arrays.asList(2, 3, 6);
        BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(new HashSet(asList), BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_FAILED, (Exception) null, false);
        brokerRemovalStateRecord2.setLastUpdateTime(millis2);
        HashMap hashMap = new HashMap();
        hashMap.put(new ImmutableSet(brokerRemovalStateRecord.brokerIds()), brokerRemovalStateRecord);
        hashMap.put(new ImmutableSet(brokerRemovalStateRecord2.brokerIds()), brokerRemovalStateRecord2);
        Mockito.when(this.mockPersistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        List brokerRemovals = kafkaDataBalanceManager.brokerRemovals();
        Assertions.assertEquals(asList.size(), brokerRemovals.size());
        assertDescriptionMatchesRecord(brokerRemovalStateRecord2, (Map) brokerRemovals.stream().collect(Collectors.toMap((v0) -> {
            return v0.brokerId();
        }, Function.identity())), 2, 3, 6);
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfOneSucceeds() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfOnePauses() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfOneThrowsException() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfMultipleSucceeds() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
        Mockito.when(this.mockDbeContext.brokersBeingAdded()).thenReturn(hashSet);
        Set set = (Set) Stream.of((Object[]) new Integer[]{11, 12}).collect(Collectors.toSet());
        kafkaDataBalanceManager.onBrokersStartup(set, set, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet2 = new HashSet(hashSet);
        hashSet2.addAll(set);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet2), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfMultipleSucceedsWithRace() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
        Mockito.when(this.mockDbeContext.brokersBeingAdded()).thenReturn(hashSet);
        Set set = (Set) Stream.of((Object[]) new Integer[]{11, 12}).collect(Collectors.toSet());
        kafkaDataBalanceManager.onBrokersStartup(set, set, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet2 = new HashSet(hashSet);
        hashSet2.addAll(set);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet2), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
        ((ConfluentDataBalanceEngineContext) Mockito.verify(this.mockDbeContext, Mockito.times(2))).brokersBeingAdded();
    }

    @Test
    public void testOnBrokersStartup_DoesntTakeActionIfEngineInactive() {
        HashSet hashSet = new HashSet();
        hashSet.add(1);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(1);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        BrokerRemovalStateTracker brokerRemovalStateTracker = (BrokerRemovalStateTracker) Mockito.mock(BrokerRemovalStateTracker.class);
        Mockito.when(Boolean.valueOf(this.mockActiveDataBalanceEngine.isActive())).thenReturn(false);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet2, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Mockito.verifyNoInteractions(new Object[]{brokerRemovalStateTracker});
    }

    @Test
    public void testOnBrokersStartup_CancelsRemoval() {
        HashSet hashSet = new HashSet();
        hashSet.add(1);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(1);
        hashSet2.add(2);
        hashSet2.add(3);
        ImmutableSet<Integer> immutableSet = new ImmutableSet<>(Arrays.asList(1, 10));
        ImmutableSet<Integer> immutableSet2 = new ImmutableSet<>(Arrays.asList(3, 4));
        BrokerRemovalCancellationMode brokerRemovalCancellationMode = BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION;
        BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent = BrokerRemovalStateMachine.BrokerRemovalEvent.BROKER_RESTARTED;
        BrokerRemovalStateTracker mockRemovalStateTracker = mockRemovalStateTracker(immutableSet, brokerRemovalCancellationMode, brokerRemovalEvent, true);
        BrokerRemovalStateTracker mockRemovalStateTracker2 = mockRemovalStateTracker(immutableSet2, brokerRemovalCancellationMode, brokerRemovalEvent, false);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Map brokerRemovalsStateTrackers = kafkaDataBalanceManager.getBalanceEngine().getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        brokerRemovalsStateTrackers.put(immutableSet, mockRemovalStateTracker);
        brokerRemovalsStateTrackers.put(immutableSet2, mockRemovalStateTracker2);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet2, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        verifyMockTrackerCancelCall(mockRemovalStateTracker, BrokerRemovalStateMachine.BrokerRemovalEvent.BROKER_RESTARTED, BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION);
        verifyMockTrackerCancelCall(mockRemovalStateTracker2, BrokerRemovalStateMachine.BrokerRemovalEvent.BROKER_RESTARTED, BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).cancelBrokerRemoval(immutableSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).cancelBrokerRemoval(immutableSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
        Assertions.assertEquals(1, brokerRemovalsStateTrackers.size());
        Assertions.assertNull(brokerRemovalsStateTrackers.get(immutableSet));
        Assertions.assertEquals(mockRemovalStateTracker2, brokerRemovalsStateTrackers.get(immutableSet2));
    }

    @Test
    public void testBalancerDisabled_CancelsRemovals() {
        BrokerRemovalCancellationMode brokerRemovalCancellationMode = BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION;
        BrokerRemovalStateMachine.BrokerRemovalEvent brokerRemovalEvent = BrokerRemovalStateMachine.BrokerRemovalEvent.BALANCER_DISABLED;
        ImmutableSet<Integer> immutableSet = new ImmutableSet<>(Collections.singletonList(1));
        ImmutableSet<Integer> immutableSet2 = new ImmutableSet<>(Collections.singletonList(2));
        ImmutableSet immutableSet3 = new ImmutableSet(Collections.singletonList(3));
        BrokerRemovalStateTracker mockRemovalStateTracker = mockRemovalStateTracker(immutableSet, brokerRemovalCancellationMode, brokerRemovalEvent, true);
        BrokerRemovalStateTracker mockRemovalStateTracker2 = mockRemovalStateTracker(immutableSet2, brokerRemovalCancellationMode, brokerRemovalEvent, false);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Map brokerRemovalsStateTrackers = kafkaDataBalanceManager.getBalanceEngine().getDataBalanceEngineContext().getBrokerRemovalsStateTrackers();
        brokerRemovalsStateTrackers.put(immutableSet, mockRemovalStateTracker);
        brokerRemovalsStateTrackers.put(immutableSet3, mockRemovalStateTracker2);
        kafkaDataBalanceManager.updateConfig(this.initConfig, this.disabledConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.eq(BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED));
        verifyMockTrackerCancelCall(mockRemovalStateTracker, BrokerRemovalStateMachine.BrokerRemovalEvent.BALANCER_DISABLED, BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION);
        verifyMockTrackerCancelCall(mockRemovalStateTracker2, BrokerRemovalStateMachine.BrokerRemovalEvent.BALANCER_DISABLED, BrokerRemovalCancellationMode.PERSISTENT_CANCELLATION);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).cancelBrokerRemoval(immutableSet);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).cancelBrokerRemoval(immutableSet2);
    }

    @Test
    public void testOnBrokersStartup_DoesntAddNonEmptyBrokers() {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        hashSet2.add(1);
        hashSet2.add(2);
        hashSet2.add(3);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet2, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).addBrokers((Set) ArgumentMatchers.any(), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
    }

    @Test
    public void testOnBrokersStartup_AddBroker_AddOfMultipleOnlyMergesEmpty() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet = new HashSet();
        hashSet.add(10);
        kafkaDataBalanceManager.onBrokersStartup(hashSet, hashSet, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
        Mockito.reset(new DataBalanceEngine[]{this.mockActiveDataBalanceEngine});
        setupMockDbe();
        kafkaDataBalanceManager.onBrokersStartup(new HashSet(), (Set) Stream.of((Object[]) new Integer[]{11, 12}).collect(Collectors.toSet()), AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.never())).addBrokers((Set) ArgumentMatchers.any(), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
        Set set = (Set) Stream.of((Object[]) new Integer[]{13, 14}).collect(Collectors.toSet());
        HashSet hashSet2 = new HashSet();
        Mockito.when(this.mockDbeContext.brokersBeingAdded()).thenReturn(hashSet);
        hashSet2.add(14);
        kafkaDataBalanceManager.onBrokersStartup(hashSet2, set, AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        HashSet hashSet3 = new HashSet(hashSet);
        hashSet3.addAll(hashSet2);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).addBrokers((Set) ArgumentMatchers.eq(hashSet3), ArgumentMatchers.anyString(), (AliveBrokersSnapshot) ArgumentMatchers.eq(AliveBrokersSnapshot.EMPTY_SNAPSHOT));
    }

    @Test
    public void testOnElection_InstantiatesBalancerTracker() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.STARTING, kafkaDataBalanceManager.balancerStatusTracker.currentState());
    }

    @Test
    public void testBalancerTracker_OnElection_BalancerDisabled() {
        this.brokerProps.put("confluent.balancer.enable", false);
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.updatedConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.DISABLED, kafkaDataBalanceManager.balancerStatusTracker.currentState());
    }

    @Test
    public void testBalancerTracker_OnElection_JbodEnabledError() {
        this.brokerProps.put(KafkaConfig.LogDirProp(), "/path/to/logs1, /path/to/logs1");
        this.updatedConfig = new KafkaConfig(this.brokerProps);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.updatedConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.JBOD_ENABLED_ERROR, kafkaDataBalanceManager.balancerStatusTracker.currentState());
    }

    @Test
    public void testBalancerState_OnMultipleElectionResignations() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        Assertions.assertNull(kafkaDataBalanceManager.balancerStatusTracker);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.STARTING, kafkaDataBalanceManager.balancerStatusTracker.currentState());
        kafkaDataBalanceManager.onResignation();
        Assertions.assertNull(kafkaDataBalanceManager.balancerStatusTracker);
        kafkaDataBalanceManager.shutdown();
        Assertions.assertNull(kafkaDataBalanceManager.balancerStatusTracker);
    }

    @Test
    public void testBalancerState_WithMultipleEnableDisableBalancer() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        ((DataBalanceEngine) Mockito.doAnswer(invocationOnMock -> {
            kafkaDataBalanceManager.balancerStatusTracker.registerEvent((BalancerStatusStateMachine.BalancerEvent) invocationOnMock.getArguments()[0]);
            return null;
        }).when(this.mockActiveDataBalanceEngine)).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.any(BalancerStatusStateMachine.BalancerEvent.class));
        Assertions.assertNull(kafkaDataBalanceManager.balancerStatusTracker);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.STARTING, kafkaDataBalanceManager.balancerStatusTracker.currentState());
        kafkaDataBalanceManager.updateConfig(this.initConfig, this.disabledConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine)).onDeactivation((BalancerStatusStateMachine.BalancerEvent) ArgumentMatchers.eq(BalancerStatusStateMachine.BalancerEvent.BALANCER_DISABLED));
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.DISABLED, kafkaDataBalanceManager.balancerStatusTracker.currentState());
        kafkaDataBalanceManager.updateConfig(this.disabledConfig, this.initConfig);
        ((DataBalanceEngine) Mockito.verify(this.mockActiveDataBalanceEngine, Mockito.times(2))).onActivation((EngineInitializationContext) ArgumentMatchers.any());
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.STARTING, kafkaDataBalanceManager.balancerStatusTracker.currentState());
    }

    @Test
    public void testBalancerState_EnableDisableBalancerAfterResignation() {
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, new KafkaDataBalanceManager.DataBalanceEngineFactory(this.mockActiveDataBalanceEngine, this.mockInactiveDataBalanceEngine), this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        Assertions.assertNull(kafkaDataBalanceManager.balancerStatusTracker);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Assertions.assertEquals(BalancerStatusStateMachine.BalancerState.STARTING, kafkaDataBalanceManager.balancerStatusTracker.currentState());
        kafkaDataBalanceManager.onResignation();
        kafkaDataBalanceManager.updateConfig(this.initConfig, this.disabledConfig);
        Assertions.assertNull(kafkaDataBalanceManager.balancerStatusTracker);
        kafkaDataBalanceManager.updateConfig(this.disabledConfig, this.initConfig);
        Assertions.assertNull(kafkaDataBalanceManager.balancerStatusTracker);
    }

    @Test
    public void testEvenClusterLoadStatus_ThrowsBalancerOfflineExceptionWhenBalanceEngineIsNotActive() {
        Mockito.when(this.mockDataBalanceEngineFactory.getInactiveDataBalanceEngine()).thenReturn(new NoOpDataBalanceEngine());
        this.dataBalancer = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, this.mockBalancerStatusTracker);
        DataBalanceManager dataBalanceManager = this.dataBalancer;
        dataBalanceManager.getClass();
        Assertions.assertThrows(BalancerOfflineException.class, dataBalanceManager::evenClusterLoadStatus);
    }

    @Test
    public void testEvenClusterLoadStatus_ReturnsStartingStatusWhenEvenClusterLoadStateManagerIsNotInitializedYet() {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(new ConfluentDataBalanceEngine((ExecutorService) Mockito.mock(ExecutorService.class), this.mockDbeContext));
        ((ConfluentDataBalanceEngine) Mockito.doAnswer(invocationOnMock -> {
            confluentDataBalanceEngine.canAcceptRequests = true;
            return true;
        }).when(confluentDataBalanceEngine)).onActivation((EngineInitializationContext) ArgumentMatchers.any());
        Mockito.when(this.mockDataBalanceEngineFactory.getActiveDataBalanceEngine()).thenReturn(confluentDataBalanceEngine);
        Properties baseBrokerProps = baseBrokerProps();
        baseBrokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(initConfig(baseBrokerProps), this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Mockito.when(this.mockDbeContext.getEvenClusterLoadStateManager()).thenReturn((Object) null);
        Assertions.assertEquals(EvenClusterLoadStatusDescriptionInternal.STARTING, kafkaDataBalanceManager.evenClusterLoadStatus());
    }

    @Test
    public void testEvenClusterLoadStatus_ReturnsDisabledStatusWhenEvenClusterLoadIsDisabled() {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(new ConfluentDataBalanceEngine((ExecutorService) Mockito.mock(ExecutorService.class), this.mockDbeContext));
        ((ConfluentDataBalanceEngine) Mockito.doAnswer(invocationOnMock -> {
            confluentDataBalanceEngine.canAcceptRequests = true;
            return true;
        }).when(confluentDataBalanceEngine)).onActivation((EngineInitializationContext) ArgumentMatchers.any());
        Mockito.when(this.mockDataBalanceEngineFactory.getActiveDataBalanceEngine()).thenReturn(confluentDataBalanceEngine);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(this.initConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Mockito.when(this.mockDbeContext.getEvenClusterLoadStateManager()).thenReturn((Object) null);
        Assertions.assertEquals(EvenClusterLoadStatusDescriptionInternal.DISABLED, kafkaDataBalanceManager.evenClusterLoadStatus());
    }

    @Test
    public void testEvenClusterLoadStatus_ThrowsExceptionWhenSBCIsDisabled() {
        Properties baseBrokerProps = baseBrokerProps();
        baseBrokerProps.put("confluent.balancer.enable", false);
        KafkaConfig kafkaConfig = new KafkaConfig(baseBrokerProps);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(new ConfluentDataBalanceEngine((ExecutorService) Mockito.mock(ExecutorService.class), this.mockDbeContext));
        ((ConfluentDataBalanceEngine) Mockito.doAnswer(invocationOnMock -> {
            confluentDataBalanceEngine.canAcceptRequests = true;
            return true;
        }).when(confluentDataBalanceEngine)).onActivation((EngineInitializationContext) ArgumentMatchers.any());
        Mockito.when(this.mockDataBalanceEngineFactory.getActiveDataBalanceEngine()).thenReturn(confluentDataBalanceEngine);
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(kafkaConfig, this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        Mockito.when(this.mockDbeContext.getEvenClusterLoadStateManager()).thenReturn((Object) null);
        kafkaDataBalanceManager.getClass();
        Assertions.assertThrows(BalancerOfflineException.class, kafkaDataBalanceManager::evenClusterLoadStatus);
    }

    @Test
    public void testEvenClusterLoadStatus_ReturnsStatus() {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(new ConfluentDataBalanceEngine((ExecutorService) Mockito.mock(ExecutorService.class), this.mockDbeContext));
        ((ConfluentDataBalanceEngine) Mockito.doAnswer(invocationOnMock -> {
            confluentDataBalanceEngine.canAcceptRequests = true;
            return true;
        }).when(confluentDataBalanceEngine)).onActivation((EngineInitializationContext) ArgumentMatchers.any());
        Mockito.when(this.mockDataBalanceEngineFactory.getActiveDataBalanceEngine()).thenReturn(confluentDataBalanceEngine);
        Properties baseBrokerProps = baseBrokerProps();
        baseBrokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
        KafkaDataBalanceManager kafkaDataBalanceManager = new KafkaDataBalanceManager(initConfig(baseBrokerProps), this.mockDataBalanceEngineFactory, this.mockDbMetrics, this.time, (BalancerStatusTracker) null);
        kafkaDataBalanceManager.onElection(AliveBrokersSnapshot.EMPTY_SNAPSHOT);
        EvenClusterLoadStatusDescriptionInternal evenClusterLoadStatusDescriptionInternal = new EvenClusterLoadStatusDescriptionInternal(EvenClusterLoadStatus.BALANCED, 205L, EvenClusterLoadStatus.BALANCING, 105L, (Exception) null);
        SelfHealingEvenClusterLoadStateManager selfHealingEvenClusterLoadStateManager = (SelfHealingEvenClusterLoadStateManager) Mockito.mock(SelfHealingEvenClusterLoadStateManager.class);
        Mockito.when(selfHealingEvenClusterLoadStateManager.evenClusterLoadStatusDescription()).thenReturn(evenClusterLoadStatusDescriptionInternal);
        Mockito.when(this.mockDbeContext.getEvenClusterLoadStateManager()).thenReturn(selfHealingEvenClusterLoadStateManager);
        Mockito.when(Boolean.valueOf(this.mockDbeContext.isCruiseControlInitialized())).thenReturn(true);
        Assertions.assertEquals(evenClusterLoadStatusDescriptionInternal, kafkaDataBalanceManager.evenClusterLoadStatus());
    }
}
