package io.confluent.databalancer.startup;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.cruisecontrol.analyzer.goals.MaxReplicaMovementParallelismGoal;
import io.confluent.cruisecontrol.analyzer.goals.ReplicaPlacementGoal;
import io.confluent.databalancer.DatabalancerUtils;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.cluster.EndPoint;
import kafka.server.Defaults;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.BalancerJbodEnabledMisconfigurationException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:io/confluent/databalancer/startup/CruiseControlStartableTest.class */
public class CruiseControlStartableTest {
    private Properties brokerProps;
    private KafkaConfig initConfig;

    @BeforeEach
    public void setUp() {
        this.brokerProps = new Properties();
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), TestUtils.MockZkConnect());
        this.brokerProps.put("confluent.balancer.enable", true);
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 200L);
        this.brokerProps.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:9092");
        this.initConfig = new KafkaConfig(this.brokerProps);
        MockitoAnnotations.openMocks(this);
    }

    @Test
    public void testGenerateCruiseControlConfig() {
        List<String> defaultGoalsConfig = defaultGoalsConfig();
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), "zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster");
        String str = "confluent.non_balancer_property_key";
        this.brokerProps.put("confluent.balancer.metric.sampler.class", "io.confluent.cruisecontrol.metricsreporter.ConfluentTelemetryReporterSampler");
        this.brokerProps.put("confluent.non_balancer_property_key", "nonBalancerPropertyValue");
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assertions.assertEquals(1, kafkaConfig.listeners().length(), "More than one listeners found: " + kafkaConfig.listeners());
        Endpoint java = ((EndPoint) kafkaConfig.listeners().head()).toJava();
        String str2 = java.host() + ":" + java.port();
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(kafkaConfig);
        Assertions.assertTrue(generateCruiseControlConfig.getList("bootstrap.servers").contains(str2), "expected bootstrap servers " + str2 + " not set, got " + generateCruiseControlConfig.getList("bootstrap.servers"));
        Assertions.assertEquals("zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster", generateCruiseControlConfig.getString("zookeeper.connect"));
        Assertions.assertNotNull(generateCruiseControlConfig.getDouble("network.inbound.capacity.threshold"), "balancer n/w input capacity property not present");
        Assertions.assertNotNull(generateCruiseControlConfig.getClass("metric.sampler.class"), "balancer metrics sampler class property not present");
        Assertions.assertEquals(ConfluentConfigs.BALANCER_NETWORK_IN_CAPACITY_DEFAULT, generateCruiseControlConfig.getLong("network.in.max.bytes.per.second"), "network inbound capacity should be at default");
        Assertions.assertEquals(ConfluentConfigs.BALANCER_NETWORK_OUT_CAPACITY_DEFAULT, generateCruiseControlConfig.getLong("network.out.max.bytes.per.second"), "network outbound capacity should be at default");
        Assertions.assertThrows(ConfigException.class, () -> {
            generateCruiseControlConfig.getString(str);
        }, "nonBalancerPropertyValue present");
        Assertions.assertEquals(defaultGoalsConfig, generateCruiseControlConfig.getList("goals"));
        Assertions.assertEquals(generateCruiseControlConfig.getBoolean("self.healing.goal.violation.enabled"), false, "Expected goal-violation self-healing to be disabled");
        Map originals = generateCruiseControlConfig.originals();
        Assertions.assertFalse(originals.containsKey(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.name"), "KafkaExporterConfig.TOPIC_NAME_CONFIG found in config");
        Assertions.assertFalse(originals.containsKey("confluent.balancer.topic.replication.factor"), "ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_CONFIG found in config");
    }

    @Test
    public void testGenerateCruiseControlConfigJbodEnabledThrowsException() {
        this.brokerProps.put(KafkaConfig.LogDirProp(), "/path/to/logs1, /path/to/logs1");
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assertions.assertThrows(BalancerJbodEnabledMisconfigurationException.class, () -> {
            CruiseControlStartable.generateCruiseControlConfig(kafkaConfig);
        });
    }

    @Test
    public void testGeneratedConfigWithOverrides() {
        List<String> defaultGoalsConfig = defaultGoalsConfig();
        ArrayList arrayList = new ArrayList(Arrays.asList(ReplicaCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskCapacityGoal.class.getName()));
        String join = String.join(",", arrayList);
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), "zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster");
        this.brokerProps.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:9095");
        Long l = 1200L;
        Long l2 = 780L;
        String str = ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.name";
        this.brokerProps.put("confluent.balancer.topic.replication.factor", "2");
        this.brokerProps.put("confluent.balancer.network.in.max.bytes.per.second", l.toString());
        this.brokerProps.put("confluent.balancer.network.out.max.bytes.per.second", l2.toString());
        this.brokerProps.put(str, "testMetricsTopic");
        this.brokerProps.put("confluent.balancer.anomaly.detection.goals", join);
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.EMPTY_BROKER.toString());
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(this.brokerProps));
        Assertions.assertTrue(generateCruiseControlConfig.getList("bootstrap.servers").contains("localhost:9095"), "KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG doesn't contain localhost:9095");
        Assertions.assertEquals(generateCruiseControlConfig.getBoolean("self.healing.goal.violation.enabled"), false, "Expected goal-violation self-healing to be disabled");
        Assertions.assertEquals(l, generateCruiseControlConfig.getLong("network.in.max.bytes.per.second"), "Network inbound capacity should have been overridden");
        Assertions.assertEquals(l2, generateCruiseControlConfig.getLong("network.out.max.bytes.per.second"), "Network outbound capacity should have been overridden");
        Map originals = generateCruiseControlConfig.originals();
        Assertions.assertEquals("testMetricsTopic", originals.get("confluent.telemetry.reporter.topic"));
        Object obj = originals.get("topic.replication.factor");
        Assertions.assertEquals("2", obj, "Balancer topic RF config of " + obj + " is not same as expected 2");
        Assertions.assertEquals(defaultGoalsConfig, generateCruiseControlConfig.getList("goals"));
        Assertions.assertEquals(arrayList, generateCruiseControlConfig.getList("anomaly.detection.goals"));
    }

    @Test
    public void testGenerateCruiseControlExclusionConfig() {
        this.brokerProps.put("confluent.balancer.exclude.topic.names", "topic1, top.c2, test-topic");
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1, pref*x2");
        String string = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(this.brokerProps)).getString("topics.excluded.from.partition.movement");
        Assertions.assertTrue(TestConstants.TOPIC1.matches(string), "Expected exact topic name to match");
        Assertions.assertTrue("test-topic".matches(string), "Expected exact topic name to match");
        Assertions.assertTrue("top.c2".matches(string), "Expected exact topic name with metadata characters to match");
        Assertions.assertTrue("prefix1-xyz".matches(string), "Expected prefix to match topic name");
        Assertions.assertTrue("prefix1".matches(string), "Expected prefix to match exact topic name");
        Assertions.assertFalse("topic1-name".matches(string), "Expected partial topic name not to match");
        Assertions.assertFalse("abc-prefix1-xyz".matches(string), "Expected topicPrefix value present in middle of topic name not to match");
        Assertions.assertFalse("abc-prefix1".matches(string), "Expected topicPrefix value as suffix not to match");
        Assertions.assertFalse("abc-topic1".matches(string), "Expected topicName value as suffix in topic name not to match");
        Assertions.assertFalse(TestConstants.TOPIC2.matches(string), "Expected topicName with regex metacharacters to be treated as a literal");
        Assertions.assertFalse("prefix2".matches(string), "Expected topicPrefix with regex metacharacters to be treated as a literal");
    }

    @Test
    public void testGeneratedEncryptedInterBrokerConfig() {
        Properties properties = new Properties();
        properties.put(KafkaConfig.ZkConnectProp(), "localhost:9095");
        properties.put(KafkaConfig.ListenersProp(), "INTERNAL://localhost:9075");
        properties.put(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:SSL");
        properties.put("ssl.protocol", "TLSv1.2");
        properties.put("ssl.truststore.location", "test.truststore.jks");
        properties.put("ssl.keystore.location", "test.keystore.jks");
        properties.put("ssl.cipher.suites", Collections.singletonList("TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA"));
        properties.put("ssl.provider", "JVM");
        properties.put("listener.name.internal.ssl.keystore.location", "listener.keystore.jks");
        properties.put("inter.broker.listener.name", "INTERNAL");
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(properties));
        Assertions.assertTrue(generateCruiseControlConfig.getList("bootstrap.servers").contains("localhost:9075"), "AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG doesn't contain localhost:9075");
        Map filterAdminClientConfigs = KafkaCruiseControlUtils.filterAdminClientConfigs(generateCruiseControlConfig.values());
        Assertions.assertEquals("SSL", filterAdminClientConfigs.get("security.protocol"));
        Assertions.assertEquals("test.truststore.jks", filterAdminClientConfigs.get("ssl.truststore.location"));
        Assertions.assertEquals("listener.keystore.jks", filterAdminClientConfigs.get("ssl.keystore.location"));
        Assertions.assertEquals("TLSv1.2", filterAdminClientConfigs.get("ssl.protocol"));
        Assertions.assertEquals(Collections.singletonList("TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA"), filterAdminClientConfigs.get("ssl.cipher.suites"));
        Assertions.assertEquals("JVM", filterAdminClientConfigs.get("ssl.provider"));
    }

    @Test
    public void testCruiseControlSelfHealingConfig() {
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(this.initConfig);
        Assertions.assertTrue(generateCruiseControlConfig.getBoolean("self.healing.broker.failure.enabled").booleanValue(), "expected self healing for broker failure to be enabled");
        Assertions.assertEquals(ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DEFAULT, generateCruiseControlConfig.getLong("broker.failure.self.healing.threshold.ms"), "expected broker failure threshold to be passed through");
        this.brokerProps.put("confluent.balancer.heal.broker.failure.threshold.ms", ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DISABLED);
        KafkaCruiseControlConfig generateCruiseControlConfig2 = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(this.brokerProps));
        Assertions.assertFalse(generateCruiseControlConfig2.getBoolean("self.healing.broker.failure.enabled").booleanValue(), "expected self healing for broker failure to be disabled");
        Assertions.assertEquals(KafkaCruiseControlConfig.DEFAULT_BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS, generateCruiseControlConfig2.getLong("broker.failure.self.healing.threshold.ms"), "cannot pass through negative self healing threshold to SelfHealingNotifier");
    }

    @Test
    public void testInvalidSelfHealingConfig() {
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", "disabled");
        Assertions.assertThrows(ConfigException.class, () -> {
            new KafkaConfig(this.brokerProps);
        }, "Expected invalid self-healing config to throw ConfigException");
    }

    @Test
    public void testGenerateRegexNoTopicsOrPrefixes() {
        Assertions.assertEquals("", DatabalancerUtils.generateCcTopicExclusionRegex(this.initConfig), "Unexpected regex generated");
    }

    @Test
    public void testGenerateRegexOnlyTopics() {
        String str = TestConstants.TOPIC1 + "2";
        this.brokerProps.put("confluent.balancer.exclude.topic.names", TestConstants.TOPIC1);
        String generateCcTopicExclusionRegex = DatabalancerUtils.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assertions.assertEquals("^" + Pattern.quote(TestConstants.TOPIC1) + "$", generateCcTopicExclusionRegex, "Unexpected regex generated");
        Assertions.assertTrue(TestConstants.TOPIC1.matches(generateCcTopicExclusionRegex), "Expected exact topic name to match");
        Assertions.assertFalse(str.matches(generateCcTopicExclusionRegex), "Expected topic with exact topic name as prefix not to match");
        Assertions.assertFalse("a".matches(generateCcTopicExclusionRegex), "Expected non-matching topic name not to match");
    }

    @Test
    public void testGenerateRegexOnlyPrefixes() {
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1");
        String generateCcTopicExclusionRegex = DatabalancerUtils.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assertions.assertEquals("^" + Pattern.quote("prefix1") + ".*", generateCcTopicExclusionRegex, "Unexpected regex generated");
        Assertions.assertTrue("prefix1".matches(generateCcTopicExclusionRegex), "Expected exact topic prefix to match");
        Assertions.assertTrue(("prefix1hello").matches(generateCcTopicExclusionRegex), "Expected topic with prefix to match");
        Assertions.assertFalse(("notprefix1").matches(generateCcTopicExclusionRegex), "Expected suffix not to match");
    }

    @Test
    public void testGenerateRegex() {
        this.brokerProps.put("confluent.balancer.exclude.topic.names", "topic1, top.c2, test-topic");
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1, pref*x2");
        String generateCcTopicExclusionRegex = DatabalancerUtils.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assertions.assertEquals("^\\Qtopic1\\E$|^\\Qtop.c2\\E$|^\\Qtest-topic\\E$|^\\Qprefix1\\E.*|^\\Qpref*x2\\E.*", generateCcTopicExclusionRegex, "Unexpected regex generated");
        Assertions.assertTrue(TestConstants.TOPIC1.matches(generateCcTopicExclusionRegex), "Expected exact topic name to match");
        Assertions.assertTrue("test-topic".matches(generateCcTopicExclusionRegex), "Expected exact topic name to match");
        Assertions.assertTrue("top.c2".matches(generateCcTopicExclusionRegex), "Expected exact topic name with metadata characters to match");
        Assertions.assertTrue("prefix1-xyz".matches(generateCcTopicExclusionRegex), "Expected prefix to match topic name");
        Assertions.assertTrue("prefix1".matches(generateCcTopicExclusionRegex), "Expected prefix to match exact topic name");
        Assertions.assertFalse("topic1-name".matches(generateCcTopicExclusionRegex), "Expected partial topic name not to match");
        Assertions.assertFalse("abc-prefix1-xyz".matches(generateCcTopicExclusionRegex), "Expected topicPrefix value present in middle of topic name not to match");
        Assertions.assertFalse("abc-prefix1".matches(generateCcTopicExclusionRegex), "Expected topicPrefix value as suffix not to match");
        Assertions.assertFalse("abc-topic1".matches(generateCcTopicExclusionRegex), "Expected topicName value as suffix in topic name not to match");
        Assertions.assertFalse(TestConstants.TOPIC2.matches(generateCcTopicExclusionRegex), "Expected topicName with regex metacharacters to be treated as a literal");
        Assertions.assertFalse("prefix2".matches(generateCcTopicExclusionRegex), "Expected topicPrefix with regex metacharacters to be treated as a literal");
    }

    @Test
    public void testPopulateClientConfigs_FallsBackToAdvertisedListeners() {
        HashMap hashMap = new HashMap();
        this.brokerProps.remove(KafkaConfig.ListenersProp());
        this.brokerProps.put(KafkaConfig.AdvertisedListenersProp(), "PLAINTEXT://host:9092");
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assertions.assertEquals(1, kafkaConfig.listeners().length(), "More than one listeners found: " + kafkaConfig.listeners());
        Assertions.assertNull(((EndPoint) kafkaConfig.listeners().head()).host(), "Expected the normal listener to have a null host");
        Assertions.assertEquals(1, kafkaConfig.advertisedListeners().length(), "More than one advertised listeners found: " + kafkaConfig.listeners());
        CruiseControlStartable.populateClientConfigs(kafkaConfig, hashMap);
        Assertions.assertEquals("host:9092", (String) hashMap.get("bootstrap.servers"));
    }

    @Test
    public void testPopulateClientConfigs_DefaultsToEmptyStringHost() {
        HashMap hashMap = new HashMap();
        this.brokerProps.remove(KafkaConfig.ListenersProp());
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assertions.assertEquals(1, kafkaConfig.listeners().length(), "More than one listeners found: " + kafkaConfig.listeners());
        Assertions.assertNull(((EndPoint) kafkaConfig.listeners().head()).host(), "Expected the normal listener to have a null host");
        Assertions.assertEquals(Defaults.Listeners(), ((EndPoint) kafkaConfig.listeners().head()).connectionString(), "Expected the normal listener to have a default listener");
        Assertions.assertEquals(1, kafkaConfig.advertisedListeners().length(), "More than one advertised listeners found: " + kafkaConfig.listeners());
        Assertions.assertNull(((EndPoint) kafkaConfig.advertisedListeners().head()).host(), "Expected the advertised listener to have a null host");
        CruiseControlStartable.populateClientConfigs(kafkaConfig, hashMap);
        Assertions.assertEquals(":9092", (String) hashMap.get("bootstrap.servers"));
    }

    private List<String> defaultGoalsConfig() {
        return new ArrayList(Arrays.asList(ReplicaPlacementGoal.class.getName(), RackAwareGoal.class.getName(), MaxReplicaMovementParallelismGoal.class.getName(), ReplicaCapacityGoal.class.getName(), DiskCapacityGoal.class.getName(), NetworkInboundCapacityGoal.class.getName(), NetworkOutboundCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskUsageDistributionGoal.class.getName(), LeaderReplicaDistributionGoal.class.getName(), NetworkInboundUsageDistributionGoal.class.getName(), NetworkOutboundUsageDistributionGoal.class.getName(), CpuUsageDistributionGoal.class.getName(), TopicReplicaDistributionGoal.class.getName(), LeaderBytesInDistributionGoal.class.getName()));
    }
}
