package io.confluent.databalancer.startup;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.CpuUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.DiskUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderBytesInDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.LeaderReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkInboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.NetworkOutboundUsageDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.RackAwareGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaCapacityGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.ReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.analyzer.goals.TopicReplicaDistributionGoal;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.cruisecontrol.analyzer.goals.ReplicaPlacementGoal;
import io.confluent.cruisecontrol.analyzer.goals.SequentialReplicaMovementGoal;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import kafka.cluster.EndPoint;
import kafka.server.Defaults;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.MockitoAnnotations;

/* loaded from: input_file:io/confluent/databalancer/startup/CruiseControlStartableTest.class */
public class CruiseControlStartableTest {
    private Properties brokerProps;
    private KafkaConfig initConfig;

    @Before
    public void setUp() {
        this.brokerProps = new Properties();
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), TestUtils.MockZkConnect());
        this.brokerProps.put("confluent.balancer.enable", true);
        this.brokerProps.put("confluent.balancer.throttle.bytes.per.second", 200L);
        this.brokerProps.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:9092");
        this.initConfig = new KafkaConfig(this.brokerProps);
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void testGenerateCruiseControlConfig() {
        List<String> defaultGoalsConfig = defaultGoalsConfig();
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), "zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster");
        String str = "confluent.non_balancer_property_key";
        this.brokerProps.put("confluent.balancer.metric.sampler.class", "io.confluent.cruisecontrol.metricsreporter.ConfluentTelemetryReporterSampler");
        this.brokerProps.put("confluent.non_balancer_property_key", "nonBalancerPropertyValue");
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assert.assertEquals("More than one listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.listeners().length());
        Endpoint java = ((EndPoint) kafkaConfig.listeners().head()).toJava();
        String str2 = java.host() + ":" + java.port();
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(kafkaConfig);
        Assert.assertTrue("expected bootstrap servers " + str2 + " not set, got " + generateCruiseControlConfig.getList("bootstrap.servers"), generateCruiseControlConfig.getList("bootstrap.servers").contains(str2));
        Assert.assertEquals("zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster", generateCruiseControlConfig.getString("zookeeper.connect"));
        Assert.assertNotNull("balancer n/w input capacity property not present", generateCruiseControlConfig.getDouble("network.inbound.capacity.threshold"));
        Assert.assertNotNull("balancer metrics sampler class property not present", generateCruiseControlConfig.getClass("metric.sampler.class"));
        Assert.assertEquals("network inbound capacity should be at default", ConfluentConfigs.BALANCER_NETWORK_IN_CAPACITY_DEFAULT, generateCruiseControlConfig.getLong("network.in.max.bytes.per.second"));
        Assert.assertEquals("network outbound capacity should be at default", ConfluentConfigs.BALANCER_NETWORK_OUT_CAPACITY_DEFAULT, generateCruiseControlConfig.getLong("network.out.max.bytes.per.second"));
        Assert.assertThrows("nonBalancerPropertyValue present", ConfigException.class, () -> {
            generateCruiseControlConfig.getString(str);
        });
        Assert.assertEquals(defaultGoalsConfig, generateCruiseControlConfig.getList("goals"));
        Assert.assertEquals("Expected goal-violation self-healing to be disabled", generateCruiseControlConfig.getBoolean("self.healing.goal.violation.enabled"), false);
        Map originals = generateCruiseControlConfig.originals();
        Assert.assertFalse("KafkaExporterConfig.TOPIC_NAME_CONFIG found in config", originals.containsKey(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.name"));
        Assert.assertFalse("ConfluentConfigs.BALANCER_TOPICS_REPLICATION_FACTOR_CONFIG found in config", originals.containsKey("confluent.balancer.topic.replication.factor"));
    }

    @Test
    public void testGeneratedConfigWithOverrides() {
        List<String> defaultGoalsConfig = defaultGoalsConfig();
        ArrayList arrayList = new ArrayList(Arrays.asList(ReplicaCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskCapacityGoal.class.getName()));
        String join = String.join(",", arrayList);
        this.brokerProps.put(KafkaConfig.ZkConnectProp(), "zookeeper-1-internal.pzkc-ldqwz.svc.cluster.local:2181,zookeeper-2-internal.pzkc-ldqwz.svc.cluster.local:2181/testKafkaCluster");
        this.brokerProps.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:9095");
        Long l = 1200L;
        Long l2 = 780L;
        String str = ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.name";
        this.brokerProps.put("confluent.balancer.topic.replication.factor", "2");
        this.brokerProps.put("confluent.balancer.network.in.max.bytes.per.second", l.toString());
        this.brokerProps.put("confluent.balancer.network.out.max.bytes.per.second", l2.toString());
        this.brokerProps.put(str, "testMetricsTopic");
        this.brokerProps.put("confluent.balancer.anomaly.detection.goals", join);
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", ConfluentConfigs.BalancerSelfHealMode.EMPTY_BROKER.toString());
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(this.brokerProps));
        Assert.assertTrue("KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG doesn't contain localhost:9095", generateCruiseControlConfig.getList("bootstrap.servers").contains("localhost:9095"));
        Assert.assertEquals("Expected goal-violation self-healing to be disabled", generateCruiseControlConfig.getBoolean("self.healing.goal.violation.enabled"), false);
        Assert.assertEquals("Network inbound capacity should have been overridden", l, generateCruiseControlConfig.getLong("network.in.max.bytes.per.second"));
        Assert.assertEquals("Network outbound capacity should have been overridden", l2, generateCruiseControlConfig.getLong("network.out.max.bytes.per.second"));
        Map originals = generateCruiseControlConfig.originals();
        Assert.assertEquals("testMetricsTopic", originals.get("confluent.telemetry.reporter.topic"));
        Object obj = originals.get("topic.replication.factor");
        Assert.assertEquals("Balancer topic RF config of " + obj + " is not same as expected 2", "2", obj);
        Assert.assertEquals(defaultGoalsConfig, generateCruiseControlConfig.getList("goals"));
        Assert.assertEquals(arrayList, generateCruiseControlConfig.getList("anomaly.detection.goals"));
    }

    @Test
    public void testGenerateCruiseControlExclusionConfig() {
        this.brokerProps.put("confluent.balancer.exclude.topic.names", "topic1, top.c2, test-topic");
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1, pref*x2");
        String string = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(this.brokerProps)).getString("topics.excluded.from.partition.movement");
        Assert.assertTrue("Expected exact topic name to match", TestConstants.TOPIC1.matches(string));
        Assert.assertTrue("Expected exact topic name to match", "test-topic".matches(string));
        Assert.assertTrue("Expected exact topic name with metadata characters to match", "top.c2".matches(string));
        Assert.assertTrue("Expected prefix to match topic name", "prefix1-xyz".matches(string));
        Assert.assertTrue("Expected prefix to match exact topic name", "prefix1".matches(string));
        Assert.assertFalse("Expected partial topic name not to match", "topic1-name".matches(string));
        Assert.assertFalse("Expected topicPrefix value present in middle of topic name not to match", "abc-prefix1-xyz".matches(string));
        Assert.assertFalse("Expected topicPrefix value as suffix not to match", "abc-prefix1".matches(string));
        Assert.assertFalse("Expected topicName value as suffix in topic name not to match", "abc-topic1".matches(string));
        Assert.assertFalse("Expected topicName with regex metacharacters to be treated as a literal", TestConstants.TOPIC2.matches(string));
        Assert.assertFalse("Expected topicPrefix with regex metacharacters to be treated as a literal", "prefix2".matches(string));
    }

    @Test
    public void testGeneratedEncryptedInterBrokerConfig() {
        Properties properties = new Properties();
        properties.put(KafkaConfig.ZkConnectProp(), "localhost:9095");
        properties.put(KafkaConfig.ListenersProp(), "INTERNAL://localhost:9075");
        properties.put(KafkaConfig.ListenerSecurityProtocolMapProp(), "INTERNAL:SSL");
        properties.put("ssl.protocol", "TLSv1.2");
        properties.put("ssl.truststore.location", "test.truststore.jks");
        properties.put("ssl.keystore.location", "test.keystore.jks");
        properties.put("ssl.cipher.suites", Collections.singletonList("TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA"));
        properties.put("ssl.provider", "JVM");
        properties.put("listener.name.internal.ssl.keystore.location", "listener.keystore.jks");
        properties.put("inter.broker.listener.name", "INTERNAL");
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(properties));
        Assert.assertTrue("AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG doesn't contain localhost:9075", generateCruiseControlConfig.getList("bootstrap.servers").contains("localhost:9075"));
        Map filterAdminClientConfigs = KafkaCruiseControlUtils.filterAdminClientConfigs(generateCruiseControlConfig.values());
        Assert.assertEquals("SSL", filterAdminClientConfigs.get("security.protocol"));
        Assert.assertEquals("test.truststore.jks", filterAdminClientConfigs.get("ssl.truststore.location"));
        Assert.assertEquals("listener.keystore.jks", filterAdminClientConfigs.get("ssl.keystore.location"));
        Assert.assertEquals("TLSv1.2", filterAdminClientConfigs.get("ssl.protocol"));
        Assert.assertEquals(Collections.singletonList("TLS_DHE_DSS_WITH_3DES_EDE_CBC_SHA"), filterAdminClientConfigs.get("ssl.cipher.suites"));
        Assert.assertEquals("JVM", filterAdminClientConfigs.get("ssl.provider"));
    }

    @Test
    public void testCruiseControlSelfHealingConfig() {
        KafkaCruiseControlConfig generateCruiseControlConfig = CruiseControlStartable.generateCruiseControlConfig(this.initConfig);
        Assert.assertTrue("expected self healing for broker failure to be enabled", generateCruiseControlConfig.getBoolean("self.healing.broker.failure.enabled").booleanValue());
        Assert.assertEquals("expected broker failure threshold to be passed through", ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DEFAULT, generateCruiseControlConfig.getLong("broker.failure.self.healing.threshold.ms"));
        this.brokerProps.put("confluent.balancer.heal.broker.failure.threshold.ms", ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DISABLED);
        KafkaCruiseControlConfig generateCruiseControlConfig2 = CruiseControlStartable.generateCruiseControlConfig(new KafkaConfig(this.brokerProps));
        Assert.assertFalse("expected self healing for broker failure to be disabled", generateCruiseControlConfig2.getBoolean("self.healing.broker.failure.enabled").booleanValue());
        Assert.assertEquals("cannot pass through negative self healing threshold to SelfHealingNotifier", KafkaCruiseControlConfig.DEFAULT_BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS, generateCruiseControlConfig2.getLong("broker.failure.self.healing.threshold.ms"));
    }

    @Test
    public void testInvalidSelfHealingConfig() {
        this.brokerProps.put("confluent.balancer.heal.uneven.load.trigger", "disabled");
        Assert.assertThrows("Expected invalid self-healing config to throw ConfigException", ConfigException.class, () -> {
            new KafkaConfig(this.brokerProps);
        });
    }

    @Test
    public void testGenerateRegexNoTopicsOrPrefixes() {
        Assert.assertEquals("Unexpected regex generated", "", CruiseControlStartable.generateCcTopicExclusionRegex(this.initConfig));
    }

    @Test
    public void testGenerateRegexOnlyTopics() {
        String str = TestConstants.TOPIC1 + "2";
        this.brokerProps.put("confluent.balancer.exclude.topic.names", TestConstants.TOPIC1);
        String generateCcTopicExclusionRegex = CruiseControlStartable.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assert.assertEquals("Unexpected regex generated", "^" + Pattern.quote(TestConstants.TOPIC1) + "$", generateCcTopicExclusionRegex);
        Assert.assertTrue("Expected exact topic name to match", TestConstants.TOPIC1.matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topic with exact topic name as prefix not to match", str.matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected non-matching topic name not to match", "a".matches(generateCcTopicExclusionRegex));
    }

    @Test
    public void testGenerateRegexOnlyPrefixes() {
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1");
        String generateCcTopicExclusionRegex = CruiseControlStartable.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assert.assertEquals("Unexpected regex generated", "^" + Pattern.quote("prefix1") + ".*", generateCcTopicExclusionRegex);
        Assert.assertTrue("Expected exact topic prefix to match", "prefix1".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected topic with prefix to match", ("prefix1hello").matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected suffix not to match", ("notprefix1").matches(generateCcTopicExclusionRegex));
    }

    @Test
    public void testGenerateRegex() {
        this.brokerProps.put("confluent.balancer.exclude.topic.names", "topic1, top.c2, test-topic");
        this.brokerProps.put("confluent.balancer.exclude.topic.prefixes", "prefix1, pref*x2");
        String generateCcTopicExclusionRegex = CruiseControlStartable.generateCcTopicExclusionRegex(new KafkaConfig(this.brokerProps));
        Assert.assertEquals("Unexpected regex generated", "^\\Qtopic1\\E$|^\\Qtop.c2\\E$|^\\Qtest-topic\\E$|^\\Qprefix1\\E.*|^\\Qpref*x2\\E.*", generateCcTopicExclusionRegex);
        Assert.assertTrue("Expected exact topic name to match", TestConstants.TOPIC1.matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected exact topic name to match", "test-topic".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected exact topic name with metadata characters to match", "top.c2".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected prefix to match topic name", "prefix1-xyz".matches(generateCcTopicExclusionRegex));
        Assert.assertTrue("Expected prefix to match exact topic name", "prefix1".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected partial topic name not to match", "topic1-name".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicPrefix value present in middle of topic name not to match", "abc-prefix1-xyz".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicPrefix value as suffix not to match", "abc-prefix1".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicName value as suffix in topic name not to match", "abc-topic1".matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicName with regex metacharacters to be treated as a literal", TestConstants.TOPIC2.matches(generateCcTopicExclusionRegex));
        Assert.assertFalse("Expected topicPrefix with regex metacharacters to be treated as a literal", "prefix2".matches(generateCcTopicExclusionRegex));
    }

    @Test
    public void testPopulateClientConfigs_FallsBackToAdvertisedListeners() {
        HashMap hashMap = new HashMap();
        this.brokerProps.remove(KafkaConfig.ListenersProp());
        this.brokerProps.put(KafkaConfig.AdvertisedHostNameProp(), "host");
        this.brokerProps.put(KafkaConfig.AdvertisedPortProp(), "9092");
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assert.assertEquals("More than one listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.listeners().length());
        Assert.assertNull("Expected the normal listener to have a null host", ((EndPoint) kafkaConfig.listeners().head()).host());
        Assert.assertEquals("More than one advertised listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.advertisedListeners().length());
        CruiseControlStartable.populateClientConfigs(kafkaConfig, hashMap);
        Assert.assertEquals("host:9092", (String) hashMap.get("bootstrap.servers"));
    }

    @Test
    public void testPopulateClientConfigs_DefaultsToEmptyStringHost() {
        HashMap hashMap = new HashMap();
        this.brokerProps.remove(KafkaConfig.ListenersProp());
        KafkaConfig kafkaConfig = new KafkaConfig(this.brokerProps);
        Assert.assertEquals("More than one listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.listeners().length());
        Assert.assertNull("Expected the normal listener to have a null host", ((EndPoint) kafkaConfig.listeners().head()).host());
        Assert.assertEquals("Expected the normal listener to have a default port", Defaults.Port(), ((EndPoint) kafkaConfig.listeners().head()).port());
        Assert.assertEquals("More than one advertised listeners found: " + kafkaConfig.listeners(), 1L, kafkaConfig.advertisedListeners().length());
        Assert.assertNull("Expected the advertised listener to have a null host", ((EndPoint) kafkaConfig.advertisedListeners().head()).host());
        CruiseControlStartable.populateClientConfigs(kafkaConfig, hashMap);
        Assert.assertEquals(":9092", (String) hashMap.get("bootstrap.servers"));
    }

    private List<String> defaultGoalsConfig() {
        return new ArrayList(Arrays.asList(ReplicaPlacementGoal.class.getName(), RackAwareGoal.class.getName(), SequentialReplicaMovementGoal.class.getName(), ReplicaCapacityGoal.class.getName(), DiskCapacityGoal.class.getName(), NetworkInboundCapacityGoal.class.getName(), NetworkOutboundCapacityGoal.class.getName(), ReplicaDistributionGoal.class.getName(), DiskUsageDistributionGoal.class.getName(), LeaderReplicaDistributionGoal.class.getName(), NetworkInboundUsageDistributionGoal.class.getName(), NetworkOutboundUsageDistributionGoal.class.getName(), CpuUsageDistributionGoal.class.getName(), TopicReplicaDistributionGoal.class.getName(), LeaderBytesInDistributionGoal.class.getName()));
    }
}
