package io.confluent.databalancer;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.brokerremoval.BrokerRemovalCallback;
import com.linkedin.kafka.cruisecontrol.brokerremoval.BrokerRemovalFuture;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.exception.KafkaCruiseControlException;
import io.confluent.cruisecontrol.analyzer.goals.ReplicaPlacementGoal;
import io.confluent.cruisecontrol.analyzer.goals.SequentialReplicaMovementGoal;
import io.confluent.databalancer.ConfluentDataBalanceEngine;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.operation.BalanceOpExecutionCompletionCallback;
import io.confluent.databalancer.operation.BrokerRemovalStateMachine;
import io.confluent.databalancer.operation.BrokerRemovalStateTracker;
import io.confluent.databalancer.persistence.ApiStatePersistenceStore;
import io.confluent.databalancer.persistence.BrokerRemovalStateRecord;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import kafka.cluster.EndPoint;
import kafka.server.Defaults$;
import kafka.server.KafkaConfig;
import kafka.server.KafkaConfig$;
import kafka.utils.MockTime;
import kafka.utils.TestUtils$;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.BalancerOfflineException;
import org.apache.kafka.common.errors.BrokerRemovalInProgressException;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.internal.verification.VerificationModeFactory;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngineTest.class */
public class ConfluentDataBalanceEngineTest {
    private Properties brokerProps;
    private KafkaConfig initConfig;
    private Time mockTime = new MockTime();

    @Mock
    private KafkaCruiseControl mockCruiseControl;

    @Mock
    private DataBalancerMetricsRegistry mockMetricsRegistry;

    @Mock
    private ApiStatePersistenceStore persistenceStore;

    @Mock
    BalanceOpExecutionCompletionCallback mockExecCompletionCb;
    private Map<Integer, BrokerRemovalStateRecord> brokerRemovalStateRecordMap;
    private ExecutorService executorService;

    /* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngineTest$MockDatabalancerStartupComponent.class */
    private static class MockDatabalancerStartupComponent {
        public static boolean block = false;
        public static boolean checkupMethodCalled = false;
        public static final Semaphore TEST_SYNC_SEMAPHORE = new Semaphore(0);

        private MockDatabalancerStartupComponent() {
        }

        public static void checkStartupCondition(KafkaCruiseControlConfig kafkaCruiseControlConfig, Semaphore semaphore) {
            checkupMethodCalled = true;
            if (block) {
                TEST_SYNC_SEMAPHORE.release();
                try {
                    semaphore.acquire();
                    throw new StartupCheckInterruptedException();
                } catch (InterruptedException e) {
                    throw new StartupCheckInterruptedException(e);
                }
            }
        }
    }

    /* loaded from: input_file:io/confluent/databalancer/ConfluentDataBalanceEngineTest$RejectedExecutorService.class */
    public static class RejectedExecutorService extends ThreadPoolExecutor {
        public RejectedExecutorService(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue, RejectedExecutionHandler rejectedExecutionHandler) {
            super(i, i2, j, timeUnit, blockingQueue, rejectedExecutionHandler);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            getRejectedExecutionHandler().rejectedExecution(runnable, this);
        }
    }

    private static ExecutorService currentThreadExecutorService() {
        return new RejectedExecutorService(0, 1, 0L, TimeUnit.SECONDS, new SynchronousQueue(), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    @Before
    public void setUp() {
        this.brokerProps = new Properties();
        this.brokerProps.put(KafkaConfig$.MODULE$.ZkConnectProp(), TestUtils$.MODULE$.MockZkConnect());
        this.brokerProps.put("confluent.balancer.enable", true);
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 200L);
        this.brokerProps.put(KafkaConfig$.MODULE$.ListenersProp(), "PLAINTEXT://localhost:9092");
        this.initConfig = new KafkaConfig(this.brokerProps);
        MockitoAnnotations.initMocks(this);
        ((KafkaCruiseControl) Mockito.doNothing().when(this.mockCruiseControl)).userTriggeredStopExecution();
    }

    @After
    public void cleanUp() {
        Mockito.reset(new ApiStatePersistenceStore[]{this.persistenceStore});
    }

    private ConfluentDataBalanceEngine getTestDataBalanceEngine() throws InterruptedException {
        this.executorService = (ExecutorService) Mockito.spy(currentThreadExecutorService());
        ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(this.mockMetricsRegistry, this.mockCruiseControl, this.mockTime));
        Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
        this.brokerRemovalStateRecordMap = new ConcurrentHashMap();
        ((ApiStatePersistenceStore) Mockito.doAnswer(invocationOnMock -> {
            BrokerRemovalStateRecord brokerRemovalStateRecord = (BrokerRemovalStateRecord) invocationOnMock.getArguments()[0];
            this.brokerRemovalStateRecordMap.put(Integer.valueOf(brokerRemovalStateRecord.brokerId()), brokerRemovalStateRecord);
            return null;
        }).when(this.persistenceStore)).save((BrokerRemovalStateRecord) ArgumentMatchers.any(BrokerRemovalStateRecord.class), ArgumentMatchers.anyBoolean());
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(this.brokerRemovalStateRecordMap);
        return new ConfluentDataBalanceEngine(this.executorService, confluentDataBalanceEngineContext);
    }

    @Test
    public void testGenerateRegexNoTopicsOrPrefixes() {
        Assert.assertEquals("Unexpected regex generated", "", ConfluentDataBalanceEngine.generateCcTopicExclusionRegex(this.initConfig));
    }

    @Test
    public void testGenerateRegexOnlyTopics() {
        String str = TestConstants.TOPIC1 + "2";
        this.brokerProps.put("confluent.balancer.exclude.topic.names", TestConstants.TOPIC1);
        String generateCcTopicExclusionRegex = ConfluentDataBalanceEngine.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assert.assertEquals("Unexpected regex generated", "^\\Qtopic1\\E$", generateCcTopicExclusionRegex);
        Assert.assertTrue("Expected exact topic name to match", TestConstants.TOPIC1.matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topic with exact topic name as prefix not to match", str.matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected non-matching topic name not to match", "a".matches(generateCcTopicExclusionRegex));
    }

    @Test
    public void testGenerateRegexOnlyPrefixes() {
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1");
        String generateCcTopicExclusionRegex = ConfluentDataBalanceEngine.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assert.assertEquals("Unexpected regex generated", "^\\Qprefix1\\E.*", generateCcTopicExclusionRegex);
        Assert.assertTrue("Expected exact topic prefix to match", "prefix1".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected topic with prefix to match", ("prefix1hello").matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected suffix not to match", ("notprefix1").matches(generateCcTopicExclusionRegex));
    }

    @Test
    public void testGenerateRegex() {
        this.brokerProps.put("confluent.balancer.exclude.topic.names", "topic1, top.c2, test-topic");
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1, pref*x2");
        String generateCcTopicExclusionRegex = ConfluentDataBalanceEngine.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assert.assertEquals("Unexpected regex generated", "^\\Qtopic1\\E$|^\\Qtop.c2\\E$|^\\Qtest-topic\\E$|^\\Qprefix1\\E.*|^\\Qpref*x2\\E.*", generateCcTopicExclusionRegex);
        Assert.assertTrue("Expected exact topic name to match", TestConstants.TOPIC1.matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected exact topic name to match", "test-topic".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected exact topic name with metadata characters to match", "top.c2".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected prefix to match topic name", "prefix1-xyz".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected prefix to match exact topic name", "prefix1".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected partial topic name not to match", "topic1-name".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicPrefix value present in middle of topic name not to match", "abc-prefix1-xyz".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicPrefix value as suffix not to match", "abc-prefix1".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicName value as suffix in topic name not to match", "abc-topic1".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicName with regex metacharacters to be treated as a literal", TestConstants.TOPIC2.matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicPrefix with regex metacharacters to be treated as a literal", "prefix2".matches(generateCcTopicExclusionRegex));
    }

    @Test
    public void testGenerateCruiseControlConfig() {
        ArrayList arrayList = new ArrayList(Arrays.asList(ReplicaPlacementGoal.class.getName(), RackAwareGoal.class.getName(), SequentialReplicaMovementGoal.class.getName(), ReplicaCapacityGoal.class.getName(), DiskCapacityGoal.class.getName(), NetworkInboundCapacityGoal.class.getName(), NetworkOutboundCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskUsageDistributionGoal.class.getName(), NetworkInboundUsageDistributionGoal.class.getName(), NetworkOutboundUsageDistributionGoal.class.getName(), CpuUsageDistributionGoal.class.getName(), TopicReplicaDistributionGoal.class.getName(), LeaderReplicaDistributionGoal.class.getName(), LeaderBytesInDistributionGoal.class.getName()));
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), "zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster");
        String str = "confluent.non_balancer_property_key";
        this.brokerProps.put("confluent.balancer.metric.sampler.class", "io.confluent.cruisecontrol.metricsreporter.ConfluentTelemetryReporterSampler");
        this.brokerProps.put("confluent.non_balancer_property_key", "nonBalancerPropertyValue");
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assert.assertEquals("More than one listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.listeners().length());
        Endpoint java = ((EndPoint) kafkaConfig.listeners().head()).toJava();
        String str2 = java.host() + ":" + java.port();
        KafkaCruiseControlConfig generateCruiseControlConfig = ConfluentDataBalanceEngine.generateCruiseControlConfig(kafkaConfig);
        Assert.assertTrue("expected bootstrap servers " + str2 + " not set, got " + generateCruiseControlConfig.getList("bootstrap.servers"), generateCruiseControlConfig.getList("bootstrap.servers").contains(str2));
        String string = generateCruiseControlConfig.getString("zookeeper.connect");
        Assert.assertEquals(string + " not same as expected zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster", "zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster", string);
        Assert.assertNotNull("balancer n/w input capacity property not present", generateCruiseControlConfig.getDouble("network.inbound.capacity.threshold"));
        Assert.assertNotNull("balancer metrics sampler class property not present", generateCruiseControlConfig.getClass("metric.sampler.class"));
        Assert.assertEquals("network inbound capacity should be at default", ConfluentConfigs.BALANCER_NETWORK_IN_CAPACITY_DEFAULT, generateCruiseControlConfig.getLong("network.in.max.bytes.per.second"));
        Assert.assertEquals("network outbound capacity should be at default", ConfluentConfigs.BALANCER_NETWORK_OUT_CAPACITY_DEFAULT, generateCruiseControlConfig.getLong("network.out.max.bytes.per.second"));
        Assert.assertThrows("nonBalancerPropertyValue present", ConfigException.class, () -> {
            generateCruiseControlConfig.getString(str);
        });
        List list = generateCruiseControlConfig.getList("goals");
        Assert.assertEquals(list + " not same as expected " + arrayList, arrayList, list);
        Assert.assertEquals("Goal config is not empty: " + list, Collections.emptyList(), generateCruiseControlConfig.getList("default.goals"));
        Assert.assertEquals("Expected goal-violation self-healing to be disabled", generateCruiseControlConfig.getBoolean("self.healing.goal.violation.enabled"), false);
        Map originals = generateCruiseControlConfig.originals();
        Assert.assertFalse("KafkaExporterConfig.TOPIC_NAME_CONFIG found in config", originals.containsKey(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.name"));
        Assert.assertFalse("ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_CONFIG found in config", originals.containsKey("confluent.balancer.topic.replication.factor"));
    }

    @Test
    public void testGeneratedConfigWithOverrides() {
        ArrayList arrayList = new ArrayList(Arrays.asList(ReplicaPlacementGoal.class.getName(), RackAwareGoal.class.getName(), SequentialReplicaMovementGoal.class.getName(), ReplicaCapacityGoal.class.getName(), DiskCapacityGoal.class.getName(), NetworkInboundCapacityGoal.class.getName(), NetworkOutboundCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskUsageDistributionGoal.class.getName(), NetworkInboundUsageDistributionGoal.class.getName(), NetworkOutboundUsageDistributionGoal.class.getName(), CpuUsageDistributionGoal.class.getName(), TopicReplicaDistributionGoal.class.getName(), LeaderReplicaDistributionGoal.class.getName(), LeaderBytesInDistributionGoal.class.getName()));
        ArrayList arrayList2 = new ArrayList(Arrays.asList(ReplicaPlacementGoal.class.getName(), RackAwareGoal.class.getName(), ReplicaCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskCapacityGoal.class.getName()));
        ArrayList arrayList3 = new ArrayList(Arrays.asList(ReplicaCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskCapacityGoal.class.getName()));
        String join = String.join(",", arrayList2);
        String join2 = String.join(",", arrayList3);
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), "zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster");
        this.brokerProps.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:9095");
        Long l = 1200L;
        Long l2 = 780L;
        String str = ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.name";
        this.brokerProps.put("confluent.balancer.topic.replication.factor", "2".toString());
        this.brokerProps.put("confluent.balancer.network.in.max.bytes.per.second", l.toString());
        this.brokerProps.put("confluent.balancer.network.out.max.bytes.per.second", l2.toString());
        this.brokerProps.put(str, "testMetricsTopic");
        this.brokerProps.put("confluent.balancer.default.goals", join);
        this.brokerProps.put("confluent.balancer.anomaly.detection.goals", join2);
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.EMPTY_BROKER.toString());
        KafkaCruiseControlConfig generateCruiseControlConfig = ConfluentDataBalanceEngine.generateCruiseControlConfig(new KafkaConfig(this.brokerProps));
        Assert.assertTrue("KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG doesn't contain localhost:9095", generateCruiseControlConfig.getList("bootstrap.servers").contains("localhost:9095"));
        Assert.assertEquals("Expected goal-violation self-healing to be disabled", generateCruiseControlConfig.getBoolean("self.healing.goal.violation.enabled"), false);
        Assert.assertEquals("Network inbound capacity should have been overridden", l, generateCruiseControlConfig.getLong("network.in.max.bytes.per.second"));
        Assert.assertEquals("Network outbound capacity should have been overridden", l2, generateCruiseControlConfig.getLong("network.out.max.bytes.per.second"));
        Map originals = generateCruiseControlConfig.originals();
        Object obj = originals.get("confluent.telemetry.reporter.topic");
        Assert.assertEquals(obj + " is not same as expected testMetricsTopic", "testMetricsTopic", obj);
        Object obj2 = originals.get("topic.replication.factor");
        Assert.assertEquals("Balancer topic RF config of " + obj2 + " is not same as expected 2", "2", obj2);
        List list = generateCruiseControlConfig.getList("goals");
        Assert.assertEquals(list + " is not same as expected " + arrayList, arrayList, list);
        List list2 = generateCruiseControlConfig.getList("default.goals");
        Assert.assertEquals(list2 + " is not same as expected " + arrayList2, arrayList2, list2);
    }

    @Test
    public void testPopulateClientConfigs_FallsBackToAdvertisedListeners() {
        HashMap hashMap = new HashMap();
        this.brokerProps.remove(KafkaConfig$.MODULE$.ListenersProp());
        this.brokerProps.put(KafkaConfig$.MODULE$.AdvertisedHostNameProp(), "host");
        this.brokerProps.put(KafkaConfig$.MODULE$.AdvertisedPortProp(), "9092");
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assert.assertEquals("More than one listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.listeners().length());
        Assert.assertNull("Expected the normal listener to have a null host", ((EndPoint) kafkaConfig.listeners().head()).host());
        Assert.assertEquals("More than one advertised listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.advertisedListeners().length());
        ConfluentDataBalanceEngine.populateClientConfigs(kafkaConfig, hashMap);
        Assert.assertEquals("host:9092", (String) hashMap.get("bootstrap.servers"));
    }

    @Test
    public void testPopulateClientConfigs_DefaultsToEmptyStringHost() {
        HashMap hashMap = new HashMap();
        this.brokerProps.remove(KafkaConfig$.MODULE$.ListenersProp());
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assert.assertEquals("More than one listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.listeners().length());
        Assert.assertNull("Expected the normal listener to have a null host", ((EndPoint) kafkaConfig.listeners().head()).host());
        Assert.assertEquals("Expected the normal listener to have a default port", Defaults$.MODULE$.Port(), ((EndPoint) kafkaConfig.listeners().head()).port());
        Assert.assertEquals("More than one advertised listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.advertisedListeners().length());
        Assert.assertNull("Expected the advertised listener to have a null host", ((EndPoint) kafkaConfig.advertisedListeners().head()).host());
        ConfluentDataBalanceEngine.populateClientConfigs(kafkaConfig, hashMap);
        Assert.assertEquals(":9092", (String) hashMap.get("bootstrap.servers"));
    }

    @Test
    public void testInvalidSelfHealingConfig() {
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", "disabled");
        Assert.assertThrows("Expected invalid self-healing config to throw ConfigException", ConfigException.class, () -> {
            new KafkaConfig(this.brokerProps);
        });
    }

    @Test
    public void testGenerateCruiseControlExclusionConfig() {
        this.brokerProps.put("confluent.balancer.exclude.topic.names", "topic1, top.c2, test-topic");
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1, pref*x2");
        String string = ConfluentDataBalanceEngine.generateCruiseControlConfig(new KafkaConfig(this.brokerProps)).getString("topics.excluded.from.partition.movement");
        Assert.assertTrue("Expected exact topic name to match", TestConstants.TOPIC1.matches(string));
        Assert.assertTrue("Expected exact topic name to match", "test-topic".matches(string));
        Assert.assertTrue("Expected exact topic name with metadata characters to match", "top.c2".matches(string));
        Assert.assertTrue("Expected prefix to match topic name", "prefix1-xyz".matches(string));
        Assert.assertTrue("Expected prefix to match exact topic name", "prefix1".matches(string));
        Assert.assertFalse("Expected partial topic name not to match", "topic1-name".matches(string));
        Assert.assertFalse("Expected topicPrefix value present in middle of topic name not to match", "abc-prefix1-xyz".matches(string));
        Assert.assertFalse("Expected topicPrefix value as suffix not to match", "abc-prefix1".matches(string));
        Assert.assertFalse("Expected topicName value as suffix in topic name not to match", "abc-topic1".matches(string));
        Assert.assertFalse("Expected topicName with regex metacharacters to be treated as a literal", TestConstants.TOPIC2.matches(string));
        Assert.assertFalse("Expected topicPrefix with regex metacharacters to be treated as a literal", "prefix2".matches(string));
    }

    @Test
    public void testGeneratedEncryptedInterBrokerConfig() {
        Properties properties = new Properties();
        properties.put(KafkaConfig$.MODULE$.ZkConnectProp(), "localhost:9095");
        properties.put(KafkaConfig$.MODULE$.ListenersProp(), "INTERNAL://localhost:9075");
        properties.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "INTERNAL:SSL");
        properties.put("ssl.protocol", "TLSv1.2");
        properties.put("ssl.truststore.location", "test.truststore.jks");
        properties.put("ssl.keystore.location", "test.keystore.jks");
        properties.put("ssl.cipher.suites", Collections.singletonList("TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA"));
        properties.put("ssl.provider", "JVM");
        properties.put("listener.name.internal.ssl.keystore.location", "listener.keystore.jks");
        properties.put("inter.broker.listener.name", "INTERNAL");
        KafkaCruiseControlConfig generateCruiseControlConfig = ConfluentDataBalanceEngine.generateCruiseControlConfig(new KafkaConfig(properties));
        Assert.assertTrue("AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG doesn't contain localhost:9075", generateCruiseControlConfig.getList("bootstrap.servers").contains("localhost:9075"));
        Map filterAdminClientConfigs = KafkaCruiseControlUtils.filterAdminClientConfigs(generateCruiseControlConfig.values());
        Assert.assertEquals("SSL", filterAdminClientConfigs.get("security.protocol"));
        Assert.assertEquals("test.truststore.jks", filterAdminClientConfigs.get("ssl.truststore.location"));
        Assert.assertEquals("listener.keystore.jks", filterAdminClientConfigs.get("ssl.keystore.location"));
        Assert.assertEquals("TLSv1.2", filterAdminClientConfigs.get("ssl.protocol"));
        Assert.assertEquals(Collections.singletonList("TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA"), filterAdminClientConfigs.get("ssl.cipher.suites"));
        Assert.assertEquals("JVM", filterAdminClientConfigs.get("ssl.provider"));
    }

    @Test
    public void testCruiseControlSelfHealingConfig() {
        KafkaCruiseControlConfig generateCruiseControlConfig = ConfluentDataBalanceEngine.generateCruiseControlConfig(this.initConfig);
        Assert.assertTrue("expected self healing for broker failure to be enabled", generateCruiseControlConfig.getBoolean("self.healing.broker.failure.enabled").booleanValue());
        Assert.assertEquals("expected broker failure threshold to be passed through", ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DEFAULT, generateCruiseControlConfig.getLong("broker.failure.self.healing.threshold.ms"));
        this.brokerProps.put("confluent.balancer.heal.broker.failure.threshold.ms", ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DISABLED);
        KafkaCruiseControlConfig generateCruiseControlConfig2 = ConfluentDataBalanceEngine.generateCruiseControlConfig(new KafkaConfig(this.brokerProps));
        Assert.assertFalse("expected self healing for broker failure to be disabled", generateCruiseControlConfig2.getBoolean("self.healing.broker.failure.enabled").booleanValue());
        Assert.assertEquals("cannot pass through negative self healing threshold to SelfHealingNotifier", KafkaCruiseControlConfig.DEFAULT_BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS, generateCruiseControlConfig2.getLong("broker.failure.self.healing.threshold.ms"));
    }

    @Test
    public void testStartupComponentsReadySuccessful() throws InterruptedException {
        List list = ConfluentDataBalanceEngine.STARTUP_COMPONENTS;
        try {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.add(new ConfluentDataBalanceEngine.StartupComponent(MockDatabalancerStartupComponent.class.getName(), MockDatabalancerStartupComponent::checkStartupCondition));
            getTestDataBalanceEngine().checkStartupComponentsReady((KafkaCruiseControlConfig) Mockito.mock(KafkaCruiseControlConfig.class));
            Assert.assertTrue("Check startup method was not called.", MockDatabalancerStartupComponent.checkupMethodCalled);
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
            MockDatabalancerStartupComponent.checkupMethodCalled = false;
        } catch (Throwable th) {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
            MockDatabalancerStartupComponent.checkupMethodCalled = false;
            throw th;
        }
    }

    @Test
    public void testStartupComponentsReadyAbort() throws Exception {
        List list = ConfluentDataBalanceEngine.STARTUP_COMPONENTS;
        try {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.add(new ConfluentDataBalanceEngine.StartupComponent(MockDatabalancerStartupComponent.class.getName(), MockDatabalancerStartupComponent::checkStartupCondition));
            KafkaCruiseControlConfig kafkaCruiseControlConfig = (KafkaCruiseControlConfig) Mockito.mock(KafkaCruiseControlConfig.class);
            MockDatabalancerStartupComponent.block = true;
            ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
            AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            Thread thread = new Thread(() -> {
                try {
                    testDataBalanceEngine.checkStartupComponentsReady(kafkaCruiseControlConfig);
                } catch (StartupCheckInterruptedException e) {
                    atomicBoolean.set(true);
                }
            });
            thread.start();
            MockDatabalancerStartupComponent.TEST_SYNC_SEMAPHORE.acquire();
            testDataBalanceEngine.onDeactivation();
            thread.join();
            Assert.assertTrue("Check Startup method was not called.", MockDatabalancerStartupComponent.checkupMethodCalled);
            Assert.assertTrue("Startup method was not aborted.", atomicBoolean.get());
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
            MockDatabalancerStartupComponent.checkupMethodCalled = false;
            MockDatabalancerStartupComponent.block = false;
        } catch (Throwable th) {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
            MockDatabalancerStartupComponent.checkupMethodCalled = false;
            MockDatabalancerStartupComponent.block = false;
            throw th;
        }
    }

    @Test
    public void testStopCruiseControlNotInitialized() {
        ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(this.mockMetricsRegistry, (KafkaCruiseControl) null, this.mockTime));
        Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
        new ConfluentDataBalanceEngine(currentThreadExecutorService(), confluentDataBalanceEngineContext).onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).shutdown();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry, Mockito.never())).clearShortLivedMetrics();
    }

    @Test
    public void testStopCruiseControlAfterShutdownd() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, VerificationModeFactory.times(1))).shutdown();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry, VerificationModeFactory.times(1))).clearShortLivedMetrics();
        testDataBalanceEngine.onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, VerificationModeFactory.times(1))).shutdown();
    }

    @Test
    public void testStopCruiseControl() throws InterruptedException {
        getTestDataBalanceEngine().stopCruiseControl();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).shutdown();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).userTriggeredStopExecution();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry)).clearShortLivedMetrics();
    }

    @Test
    public void testDeactivation() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.canAcceptRequests = true;
        testDataBalanceEngine.onDeactivation();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).shutdown();
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).userTriggeredStopExecution();
        ((DataBalancerMetricsRegistry) Mockito.verify(this.mockMetricsRegistry)).clearShortLivedMetrics();
        Assert.assertFalse("DatabalanceEngine is not stopped", testDataBalanceEngine.canAcceptRequests);
    }

    @Test
    public void testStartCruiseControlNoOp() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.startCruiseControl((EngineInitializationContext) null, (Function) null);
        Assert.assertSame(testDataBalanceEngine.context.getCruiseControl(), this.mockCruiseControl);
    }

    @Test
    public void testStartCruiseControlSuccess() throws Exception {
        List list = ConfluentDataBalanceEngine.STARTUP_COMPONENTS;
        try {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
            Mockito.when(kafkaConfig.logDirs()).thenReturn(JavaConverters.asScalaBuffer(Collections.singletonList("/log_dir")));
            Mockito.when(kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger")).thenReturn(ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString());
            Mockito.when(kafkaConfig.originalsWithPrefix(Mockito.anyString())).thenReturn(Collections.singletonMap("bootstrap.servers", "bootstrap_server"));
            ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(this.mockMetricsRegistry, (KafkaCruiseControl) null, this.mockTime));
            Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
            new ConfluentDataBalanceEngine(currentThreadExecutorService(), confluentDataBalanceEngineContext).startCruiseControl(initializationContext(kafkaConfig), kafkaConfig2 -> {
                return this.mockCruiseControl;
            });
            ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).startUp();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
        } catch (Throwable th) {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
            throw th;
        }
    }

    @Test
    public void testStartCruiseControlFailed() throws Exception {
        List list = ConfluentDataBalanceEngine.STARTUP_COMPONENTS;
        try {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.clear();
            KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
            ConfluentDataBalanceEngineContext confluentDataBalanceEngineContext = (ConfluentDataBalanceEngineContext) Mockito.spy(new ConfluentDataBalanceEngineContext(this.mockMetricsRegistry, (KafkaCruiseControl) null, this.mockTime));
            Mockito.when(confluentDataBalanceEngineContext.getPersistenceStore()).thenReturn(this.persistenceStore);
            new ConfluentDataBalanceEngine(currentThreadExecutorService(), confluentDataBalanceEngineContext).startCruiseControl(initializationContext(kafkaConfig), kafkaConfig2 -> {
                return null;
            });
            ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).startUp();
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
        } catch (Throwable th) {
            ConfluentDataBalanceEngine.STARTUP_COMPONENTS.addAll(list);
            throw th;
        }
    }

    private EngineInitializationContext initializationContext(KafkaConfig kafkaConfig) {
        return new EngineInitializationContext(kafkaConfig, Collections.emptyMap(), num -> {
            return new AtomicReference("Test metric");
        });
    }

    @Test
    public void testOnActivation() throws InterruptedException {
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        ((ExecutorService) Mockito.doReturn((Object) null).when(this.executorService)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        ((ExecutorService) Mockito.verify(this.executorService)).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        Assert.assertTrue("DatabalanceEngine is not started", testDataBalanceEngine.canAcceptRequests);
    }

    @Test
    public void testRemoveBroker() throws Throwable {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(this.mockCruiseControl.removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn(brokerRemovalFuture);
        ArgumentCaptor forClass = ArgumentCaptor.forClass(BalanceOpExecutionCompletionCallback.class);
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        testDataBalanceEngine.removeBroker(1, of, "uid");
        Assert.assertTrue("DatabalanceEngine is not started", testDataBalanceEngine.canAcceptRequests);
        ((ExecutorService) Mockito.verify(this.executorService, VerificationModeFactory.times(2))).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) forClass.capture(), (BrokerRemovalCallback) Mockito.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString());
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).putBrokerRemovalFuture(1, brokerRemovalFuture);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).execute(Duration.ofMinutes(60L));
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context, Mockito.never())).removeBrokerRemovalFuture(1);
        ((BalanceOpExecutionCompletionCallback) forClass.getValue()).accept(true, (Throwable) null);
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).removeBrokerRemovalFuture(1);
        Assert.assertEquals("Expected to have a persisted broker removal state", 1L, this.brokerRemovalStateRecordMap.size());
        Assert.assertEquals(brokerRemovalStateRecord, this.brokerRemovalStateRecordMap.get(1));
        Assert.assertEquals("Expected to have one broker removal state tracker in the context object", 1L, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size());
    }

    @Test
    public void testBrokerRemovalTerminationListener() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.context.brokerRemovalsStateTrackers.put(1, Mockito.mock(BrokerRemovalStateTracker.class));
        testDataBalanceEngine.removalTerminationListener.onTerminalState(1, (BrokerRemovalStateMachine.BrokerRemovalState) null, (Exception) null);
        Assert.assertEquals("Expected to have removed the broker removal state tracker in the context object", 0L, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size());
    }

    @Test
    public void testRemoveBroker_InitiatesRemovalOnPreviouslyRemovedBroker() throws Throwable {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(this.mockCruiseControl.removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn(brokerRemovalFuture);
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(1)).thenReturn(new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null));
        testDataBalanceEngine.removeBroker(1, of, "uid");
        ((ExecutorService) Mockito.verify(this.executorService, VerificationModeFactory.times(2))).submit((Runnable) ArgumentMatchers.any(Runnable.class));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString());
        ((ConfluentDataBalanceEngineContext) Mockito.verify(testDataBalanceEngine.context)).putBrokerRemovalFuture(1, brokerRemovalFuture);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).execute(Duration.ofMinutes(60L));
        Assert.assertTrue("DatabalanceEngine is not started", testDataBalanceEngine.canAcceptRequests);
    }

    @Test(expected = BrokerRemovalInProgressException.class)
    public void testRemoveBrokerTwiceFailure() throws InterruptedException {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(1)).thenReturn(new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null));
        testDataBalanceEngine.removeBroker(1, of, "uid");
    }

    @Test(expected = BrokerRemovalInProgressException.class)
    public void testRemoveBrokerFailureForExistingRemovals() throws InterruptedException {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1 + 1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        Mockito.when(this.persistenceStore.getBrokerRemovalStateRecord(1)).thenReturn((Object) null);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(Collections.singletonMap(Integer.valueOf(brokerRemovalStateRecord.brokerId()), brokerRemovalStateRecord));
        testDataBalanceEngine.removeBroker(1, of, "uid");
    }

    @Test(expected = BalancerOfflineException.class)
    public void testRemoveBrokerThrowsBalancerOfflineExceptionIfNoActiveDatabalancer() throws InterruptedException {
        getTestDataBalanceEngine().removeBroker(1, Optional.of(1L), "uid");
    }

    @Test
    public void testCancelBrokerRemovals_CallsFutureCancel() throws InterruptedException {
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        BrokerRemovalFuture brokerRemovalFuture = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(Boolean.valueOf(brokerRemovalFuture.cancel())).thenReturn(true);
        BrokerRemovalFuture brokerRemovalFuture2 = (BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class);
        Mockito.when(Boolean.valueOf(brokerRemovalFuture2.cancel())).thenReturn(false);
        testDataBalanceEngine.context.putBrokerRemovalFuture(1, brokerRemovalFuture);
        testDataBalanceEngine.context.putBrokerRemovalFuture(2, brokerRemovalFuture2);
        boolean cancelBrokerRemoval = testDataBalanceEngine.cancelBrokerRemoval(1);
        boolean cancelBrokerRemoval2 = testDataBalanceEngine.cancelBrokerRemoval(2);
        boolean cancelBrokerRemoval3 = testDataBalanceEngine.cancelBrokerRemoval(3);
        Assert.assertTrue("Expected broker 1 to be cancelled successfully", cancelBrokerRemoval);
        Assert.assertFalse("Expected broker 2 to not be cancelled successfully", cancelBrokerRemoval2);
        Assert.assertFalse("Expected broker 3 to not be cancelled successfully because it doesn't exist", cancelBrokerRemoval3);
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture)).cancel();
        ((BrokerRemovalFuture) Mockito.verify(brokerRemovalFuture2)).cancel();
    }

    @Test
    public void testUpdateThrottleWhileRunning() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.updateThrottle(100L);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).updateThrottle(100L);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateThrottleHelper(100L);
    }

    @Test
    public void testUpdateThrottleWhileStopped() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onDeactivation();
        confluentDataBalanceEngine.updateThrottle(100L);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).updateThrottle(ArgumentMatchers.anyLong());
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateThrottle(100L);
    }

    @Test
    public void testUpdateAutoHeal() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.setAutoHealMode(true);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).setGoalViolationSelfHealing(true);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateAutoHealHelper(true);
        confluentDataBalanceEngine.setAutoHealMode(false);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).setGoalViolationSelfHealing(false);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).updateAutoHealHelper(false);
    }

    @Test
    public void testUpdateAutoHealWhenStopped() throws InterruptedException {
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onDeactivation();
        confluentDataBalanceEngine.setAutoHealMode(true);
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl, Mockito.never())).setGoalViolationSelfHealing(true);
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).setAutoHealMode(true);
    }

    @Test
    public void testAddBroker_success() throws KafkaCruiseControlException, InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).doAddBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
    }

    @Test
    public void testAddBroker_pendingRemove() throws InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerRemovalStateRecord);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine, Mockito.never())).doAddBrokers((Set) ArgumentMatchers.any(), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(), (String) ArgumentMatchers.any());
    }

    @Test
    public void testAddBroker_successfulRemove() throws KafkaCruiseControlException, InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_SUCCEEDED, (Exception) null);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerRemovalStateRecord);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).doAddBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
    }

    @Test
    public void testAddBroker_failedRemove() throws KafkaCruiseControlException, InterruptedException {
        HashSet hashSet = new HashSet(Collections.singletonList(10));
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine confluentDataBalanceEngine = (ConfluentDataBalanceEngine) Mockito.spy(getTestDataBalanceEngine());
        confluentDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_EXECUTION_CANCELED, (Exception) null);
        HashMap hashMap = new HashMap();
        hashMap.put(1, brokerRemovalStateRecord);
        Mockito.when(this.persistenceStore.getAllBrokerRemovalStateRecords()).thenReturn(hashMap);
        confluentDataBalanceEngine.addBrokers(hashSet, this.mockExecCompletionCb, "testUid");
        ((ConfluentDataBalanceEngine) Mockito.verify(confluentDataBalanceEngine)).doAddBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
        ((KafkaCruiseControl) Mockito.verify(this.mockCruiseControl)).addBrokers((Set) ArgumentMatchers.eq(hashSet), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.eq(this.mockExecCompletionCb), (String) ArgumentMatchers.eq("testUid"));
    }

    @Test
    public void testRemoveBrokerStateTrackerContinuouslyUpdatesStatus() throws Throwable {
        Optional of = Optional.of(1L);
        KafkaConfig kafkaConfig = (KafkaConfig) Mockito.mock(KafkaConfig.class);
        ConfluentDataBalanceEngine testDataBalanceEngine = getTestDataBalanceEngine();
        testDataBalanceEngine.onActivation(initializationContext(kafkaConfig));
        Mockito.when(this.mockCruiseControl.removeBroker(ArgumentMatchers.eq(1), (Optional) ArgumentMatchers.eq(of), (BalanceOpExecutionCompletionCallback) ArgumentMatchers.any(BalanceOpExecutionCompletionCallback.class), (BrokerRemovalCallback) ArgumentMatchers.any(BrokerRemovalCallback.class), ArgumentMatchers.anyString())).thenReturn((BrokerRemovalFuture) Mockito.mock(BrokerRemovalFuture.class));
        BrokerRemovalStateRecord brokerRemovalStateRecord = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.INITIAL_PLAN_COMPUTATION_INITIATED, (Exception) null);
        testDataBalanceEngine.removeBroker(1, of, "uid");
        Assert.assertEquals("Expected to have a persisted broker removal state", 1L, this.brokerRemovalStateRecordMap.size());
        Assert.assertEquals(brokerRemovalStateRecord, this.brokerRemovalStateRecordMap.get(1));
        Assert.assertEquals("Expected to have one broker removal state tracker in the context object", 1L, testDataBalanceEngine.context.brokerRemovalsStateTrackers.size());
        BrokerRemovalStateTracker brokerRemovalStateTracker = (BrokerRemovalStateTracker) testDataBalanceEngine.context.brokerRemovalsStateTrackers.get(1);
        brokerRemovalStateTracker.registerEvent(BrokerRemovalCallback.BrokerRemovalEvent.INITIAL_PLAN_COMPUTATION_SUCCESS);
        brokerRemovalStateTracker.registerEvent(BrokerRemovalCallback.BrokerRemovalEvent.BROKER_SHUTDOWN_SUCCESS);
        BrokerRemovalStateRecord brokerRemovalStateRecord2 = new BrokerRemovalStateRecord(1, BrokerRemovalStateMachine.BrokerRemovalState.PLAN_COMPUTATION_INITIATED, (Exception) null);
        BrokerRemovalStateRecord brokerRemovalStateRecord3 = this.brokerRemovalStateRecordMap.get(1);
        Assert.assertEquals(brokerRemovalStateRecord2.brokerShutdownStatus(), brokerRemovalStateRecord3.brokerShutdownStatus());
        Assert.assertEquals(brokerRemovalStateRecord2.partitionReassignmentsStatus(), brokerRemovalStateRecord3.partitionReassignmentsStatus());
        Assert.assertEquals(brokerRemovalStateRecord2.exception(), brokerRemovalStateRecord3.exception());
        Assert.assertEquals(brokerRemovalStateRecord2.brokerId(), brokerRemovalStateRecord3.brokerId());
    }
}
