package io.confluent.databalancer.startup;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControl;
import com.linkedin.kafka.cruisecontrol.client.BlockingSendClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import io.confluent.cruisecontrol.metricsreporter.ConfluentMetricsSamplerBase;
import io.confluent.databalancer.metrics.DataBalancerMetricsRegistry;
import io.confluent.databalancer.startup.StartupComponents;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.cluster.EndPoint;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/databalancer/startup/CruiseControlStartable.class */
public class CruiseControlStartable {
    private static final Logger LOG = LoggerFactory.getLogger(CruiseControlStartable.class);
    private static final String SHUTDOWN_MANAGER_CLIENT_ID = "SBK-broker-shutdown-manager";
    private static final String START_ANCHOR = "^";
    private static final String END_ANCHOR = "$";
    private static final String WILDCARD_SUFFIX = ".*";
    private final Time time;
    private final DataBalancerMetricsRegistry dataBalancerMetricsRegistry;

    public CruiseControlStartable(Time time, DataBalancerMetricsRegistry dataBalancerMetricsRegistry) {
        this.time = time;
        this.dataBalancerMetricsRegistry = dataBalancerMetricsRegistry;
    }

    public KafkaCruiseControl startUp(KafkaConfig kafkaConfig, Semaphore semaphore) throws ExecutionException, InterruptedException {
        BlockingSendClient.Builder builder = new BlockingSendClient.Builder(kafkaConfig, this.time, SHUTDOWN_MANAGER_CLIENT_ID, new LogContext());
        Option zkClientConfigFromKafkaConfig = KafkaServer.zkClientConfigFromKafkaConfig(kafkaConfig, false);
        KafkaCruiseControlConfig generateCruiseControlConfig = generateCruiseControlConfig(kafkaConfig);
        StartupComponents build = new StartupComponents.Builder(semaphore).build(generateCruiseControlConfig);
        LOG.info("DataBalancer: Checking startup components");
        build.checkStartupCondition();
        LOG.info("DataBalancer: Starting CruiseControl");
        KafkaCruiseControl kafkaCruiseControl = new KafkaCruiseControl(generateCruiseControlConfig, zkClientConfigFromKafkaConfig, this.dataBalancerMetricsRegistry, builder);
        kafkaCruiseControl.startUp();
        return kafkaCruiseControl;
    }

    static KafkaCruiseControlConfig generateCruiseControlConfig(KafkaConfig kafkaConfig) {
        HashMap hashMap = new HashMap(kafkaConfig.originalsWithPrefix("confluent.balancer."));
        hashMap.putIfAbsent(KafkaCruiseControlConfig.ZOOKEEPER_CONNECT_CONFIG, kafkaConfig.get(KafkaConfig.ZkConnectProp()));
        hashMap.put(BrokerCapacityResolver.LOG_DIRS_CONFIG, getConfiguredLogDirs(kafkaConfig));
        hashMap.putIfAbsent(KafkaCruiseControlConfig.ZOOKEEPER_SECURITY_ENABLED_CONFIG, Boolean.valueOf(kafkaConfig.zkEnableSecureAcls()));
        configureCruiseControlSelfHealing(kafkaConfig, hashMap);
        if (hashMap.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG) == null) {
            populateClientConfigs(kafkaConfig, hashMap);
        }
        LOG.info("DataBalancer: BOOTSTRAP_SERVERS determined to be {}", hashMap.get(KafkaCruiseControlConfig.BOOTSTRAP_SERVERS_CONFIG));
        String str = (String) kafkaConfig.originals().get(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.name");
        if (str != null && str.length() > 0) {
            hashMap.putIfAbsent(ConfluentMetricsSamplerBase.TELEMETRY_REPORTER_TOPIC_PATTERN, str);
        }
        hashMap.put(KafkaCruiseControlConfig.TOPICS_EXCLUDED_FROM_PARTITION_MOVEMENT_CONFIG, generateCcTopicExclusionRegex(kafkaConfig));
        return new KafkaCruiseControlConfig(hashMap);
    }

    private static void configureCruiseControlSelfHealing(KafkaConfig kafkaConfig, Map<String, Object> map) {
        Long l = kafkaConfig.getLong("confluent.balancer.heal.broker.failure.threshold.ms");
        boolean z = !l.equals(ConfluentConfigs.BALANCER_BROKER_FAILURE_THRESHOLD_DISABLED);
        map.putIfAbsent(KafkaCruiseControlConfig.SELF_HEALING_BROKER_FAILURE_ENABLED_CONFIG, Boolean.valueOf(z));
        if (z) {
            map.putIfAbsent(KafkaCruiseControlConfig.BROKER_FAILURE_SELF_HEALING_THRESHOLD_MS_CONFIG, l);
        }
        if (kafkaConfig.getString("confluent.balancer.heal.uneven.load.trigger").equals(ConfluentConfigs.BalancerSelfHealMode.ANY_UNEVEN_LOAD.toString())) {
            map.putIfAbsent(KafkaCruiseControlConfig.SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG, true);
        } else {
            map.putIfAbsent(KafkaCruiseControlConfig.SELF_HEALING_GOAL_VIOLATION_ENABLED_CONFIG, false);
        }
    }

    static void populateClientConfigs(KafkaConfig kafkaConfig, Map<String, Object> map) {
        Map<String, Object> generateClientConfigs = generateClientConfigs(kafkaConfig);
        LOG.info("Adding configs {} to config", generateClientConfigs);
        map.putAll(generateClientConfigs);
    }

    public static Map<String, Object> generateClientConfigs(KafkaConfig kafkaConfig) {
        Endpoint java = ((EndPoint) kafkaConfig.listeners().find(endPoint -> {
            return Boolean.valueOf(endPoint.listenerName().equals(kafkaConfig.interBrokerListenerName()));
        }).get()).toJava();
        if (java.host() == null || java.host().isEmpty()) {
            Endpoint java2 = ((EndPoint) kafkaConfig.advertisedListeners().find(endPoint2 -> {
                return Boolean.valueOf(endPoint2.listenerName().equals(kafkaConfig.interBrokerListenerName()));
            }).get()).toJava();
            if (java2.host() == null || java2.host().isEmpty()) {
                LOG.warn(String.format("Could not find a host in both the normal (%s) and advertised (%s) internal broker listener. This will default to localhost", java, java2));
            }
            java = java2;
        }
        LOG.info("DataBalancer: Listener endpoint is {}", java);
        return ConfluentConfigs.interBrokerClientConfigs(kafkaConfig, java);
    }

    static String generateCcTopicExclusionRegex(KafkaConfig kafkaConfig) {
        return (String) Stream.concat(kafkaConfig.getList("confluent.balancer.exclude.topic.names").stream().map(str -> {
            return START_ANCHOR + Pattern.quote(str) + END_ANCHOR;
        }), kafkaConfig.getList("confluent.balancer.exclude.topic.prefixes").stream().map(str2 -> {
            return START_ANCHOR + Pattern.quote(str2) + WILDCARD_SUFFIX;
        })).collect(Collectors.joining("|"));
    }

    private static String getConfiguredLogDirs(KafkaConfig kafkaConfig) {
        List seqAsJavaList = JavaConverters.seqAsJavaList(kafkaConfig.logDirs());
        if (seqAsJavaList == null || seqAsJavaList.size() == 0) {
            throw new ConfigException("Broker configured with null or empty log directory");
        }
        if (seqAsJavaList.size() > 1) {
            throw new ConfigException("SBK configured with multiple log directories");
        }
        return (String) seqAsJavaList.get(0);
    }
}
