package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.cruisecontrol.common.utils.Utils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import kafka.admin.AdminOperationException;
import kafka.cluster.Broker;
import kafka.server.ConfigType;
import kafka.server.KafkaConfig;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.collection.JavaConverters;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelper.class */
public class ReplicationThrottleHelper {
    private static final Logger LOG;
    static final String LEADER_THROTTLED_RATE;
    static final String FOLLOWER_THROTTLED_RATE;
    static final String LEADER_THROTTLED_REPLICAS;
    static final String FOLLOWER_THROTTLED_REPLICAS;
    private final KafkaZkClient _kafkaZkClient;
    private final AdminZkClient _adminZkClient;
    private Admin _adminClient;
    Long _throttleRate;
    private boolean _overrideStaticThrottleRate;
    boolean _autoThrottleEnabled;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReplicationThrottleHelper(KafkaZkClient kafkaZkClient, Admin admin, Long l, boolean z) {
        this._kafkaZkClient = kafkaZkClient;
        this._adminZkClient = new AdminZkClient(kafkaZkClient);
        this._adminClient = admin;
        this._throttleRate = l;
        this._autoThrottleEnabled = l != null && l.longValue() == KafkaCruiseControlConfig.AUTO_THROTTLE;
        this._overrideStaticThrottleRate = z;
        LOG.info("Set throttle rate {}. Will " + (z ? "" : "not") + " override static throttles when setting the rate.", this._throttleRate);
    }

    Long getThrottleRate() {
        return this._autoThrottleEnabled ? Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE) : this._throttleRate;
    }

    public void setThrottleRate(Long l) {
        if (l != null) {
            this._autoThrottleEnabled = l.longValue() == KafkaCruiseControlConfig.AUTO_THROTTLE;
        }
        this._throttleRate = l;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void setThrottles(List<ExecutionProposal> list, LoadMonitor loadMonitor, Set<Integer> set) throws InterruptedException {
        if (!throttlingEnabled()) {
            LOG.info("Skipped setting rebalance throttle because it is not enabled");
            return;
        }
        try {
            doSetThrottles(list, loadMonitor, set);
        } catch (InterruptedException e) {
            LOG.error("Interrupted while setting rebalance throttle.", e);
            throw e;
        } catch (Exception e2) {
            LOG.error("Unexpected exception while setting rebalance throttle.", e2);
            throw new RuntimeException(e2);
        }
    }

    private void doSetThrottles(List<ExecutionProposal> list, LoadMonitor loadMonitor, Set<Integer> set) throws ExecutionException, InterruptedException {
        if (this._autoThrottleEnabled) {
            this._throttleRate = Long.valueOf(loadMonitor.computeThrottle());
        }
        Set<Integer> participatingBrokers = getParticipatingBrokers(list);
        TreeSet treeSet = new TreeSet(participatingBrokers);
        treeSet.removeAll(set);
        if (!set.isEmpty()) {
            LOG.info("Skipping fetching configs for brokers currently being removed: {}", set);
        }
        Map<ConfigResource, Config> fetchBrokerConfigs = fetchBrokerConfigs(treeSet);
        Set<Integer> filterBrokersWithStaticThrottles = filterBrokersWithStaticThrottles(fetchBrokerConfigs);
        Map<String, Set<String>> throttledReplicasByTopic = getThrottledReplicasByTopic(list, filterBrokersWithStaticThrottles);
        LOG.info("Setting a rebalance throttle of {} bytes/sec to {} brokers and {} topics. Brokers {} already have static throttles set", new Object[]{this._throttleRate, Utils.join(participatingBrokers, ", "), Utils.join(throttledReplicasByTopic.keySet(), ", "), Utils.join(filterBrokersWithStaticThrottles, ", ")});
        for (Integer num : participatingBrokers) {
            Config config = fetchBrokerConfigs.get(new ConfigResource(ConfigResource.Type.BROKER, Integer.toString(num.intValue())));
            if (config == null) {
                config = new Config(Collections.emptyList());
                LOG.warn("Setting throttle rates on broker {} despite not having been able to fetch its configs", num);
            }
            setLeaderThrottledRateIfUnset(num.intValue(), config);
            setFollowerThrottledRateIfUnset(num.intValue(), config);
        }
        throttledReplicasByTopic.forEach(this::setLeaderThrottledReplicas);
        throttledReplicasByTopic.forEach(this::setFollowerThrottledReplicas);
    }

    private Map<ConfigResource, Config> fetchBrokerConfigs(Set<Integer> set) throws ExecutionException, InterruptedException {
        LOG.info("Fetching configs for brokers {}", set);
        List list = (List) set.stream().map(num -> {
            return new ConfigResource(ConfigResource.Type.BROKER, num.toString());
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : this._adminClient.describeConfigs(list).values().entrySet()) {
            String name = ((ConfigResource) entry.getKey()).name();
            try {
                hashMap.put(entry.getKey(), ((KafkaFuture) entry.getValue()).get());
            } catch (ExecutionException e) {
                if (!(e.getCause() instanceof TimeoutException)) {
                    throw e;
                }
                LOG.warn("Could not fetch broker configs for broker {} when setting replication throttles. This could be because the broker is offline. Ignoring it.", name);
            }
        }
        return hashMap;
    }

    private Set<Integer> filterBrokersWithStaticThrottles(Map<ConfigResource, Config> map) throws InterruptedException {
        return (Set) map.entrySet().stream().filter(entry -> {
            ConfigEntry configEntry = ((Config) entry.getValue()).get(KafkaConfig.LeaderReplicationThrottledReplicasProp());
            ConfigEntry configEntry2 = ((Config) entry.getValue()).get(KafkaConfig.FollowerReplicationThrottledReplicasProp());
            return (configEntry == null || configEntry.value() == null || !configEntry.value().equals("*") || configEntry2 == null || configEntry2.value() == null || !configEntry2.value().equals("*")) ? false : true;
        }).map(entry2 -> {
            return Integer.valueOf(Integer.parseInt(((ConfigResource) entry2.getKey()).name()));
        }).collect(Collectors.toSet());
    }

    boolean shouldRemoveThrottleForTask(ExecutionTask executionTask) {
        return (executionTask.state() == ExecutionTask.State.IN_PROGRESS || executionTask.state() == ExecutionTask.State.PENDING || executionTask.type() != ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION) ? false : true;
    }

    boolean taskIsInProgress(ExecutionTask executionTask) {
        return executionTask.state() == ExecutionTask.State.IN_PROGRESS && executionTask.type() == ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void clearThrottles(List<ExecutionTask> list, List<ExecutionTask> list2) {
        if (throttlingEnabled()) {
            List<ExecutionProposal> list3 = (List) list.stream().filter(this::shouldRemoveThrottleForTask).map((v0) -> {
                return v0.proposal();
            }).collect(Collectors.toList());
            Set<Integer> participatingBrokers = getParticipatingBrokers(list3);
            Set<Integer> participatingBrokers2 = getParticipatingBrokers((List) list2.stream().filter(this::taskIsInProgress).map((v0) -> {
                return v0.proposal();
            }).collect(Collectors.toList()));
            TreeSet treeSet = new TreeSet(participatingBrokers);
            treeSet.removeAll(participatingBrokers2);
            LOG.info("Removing replica movement throttles from brokers in the cluster: {}", treeSet);
            treeSet.forEach(this::removeThrottledRateFromBroker);
            getThrottledReplicasByTopic(list3, Collections.emptySet()).forEach(this::removeThrottledReplicasFromTopic);
        }
    }

    private boolean throttlingEnabled() {
        return (this._throttleRate == null || this._throttleRate == ConfluentConfigs.BALANCER_THROTTLE_NO_THROTTLE) ? false : true;
    }

    private Set<Integer> getParticipatingBrokers(List<ExecutionProposal> list) {
        TreeSet treeSet = new TreeSet();
        for (ExecutionProposal executionProposal : list) {
            treeSet.addAll((Collection) executionProposal.oldReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }).collect(Collectors.toSet()));
            treeSet.addAll((Collection) executionProposal.newReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }).collect(Collectors.toSet()));
        }
        return treeSet;
    }

    private Map<String, Set<String>> getThrottledReplicasByTopic(List<ExecutionProposal> list, Set<Integer> set) {
        HashMap hashMap = new HashMap();
        for (ExecutionProposal executionProposal : list) {
            String str = executionProposal.topic();
            int partitionId = executionProposal.partitionId();
            List list2 = (List) Stream.concat(executionProposal.oldReplicas().stream().map((v0) -> {
                return v0.brokerId();
            }), executionProposal.replicasToAdd().stream().map((v0) -> {
                return v0.brokerId();
            })).collect(Collectors.toList());
            if (!set.containsAll(list2)) {
                Set set2 = (Set) hashMap.computeIfAbsent(str, str2 -> {
                    return new TreeSet();
                });
                list2.forEach(num -> {
                    set2.add(partitionId + ":" + num);
                });
            }
        }
        return hashMap;
    }

    private void setLeaderThrottledRateIfUnset(int i, Config config) {
        setThrottledRateIfUnset(i, config, LEADER_THROTTLED_RATE);
    }

    private void setFollowerThrottledRateIfUnset(int i, Config config) {
        setThrottledRateIfUnset(i, config, FOLLOWER_THROTTLED_RATE);
    }

    private void setThrottledRateIfUnset(int i, Config config, String str) {
        ConfigEntry configEntry;
        if (!$assertionsDisabled && this._throttleRate == null) {
            throw new AssertionError();
        }
        if (!$assertionsDisabled && !str.equals(LEADER_THROTTLED_RATE) && !str.equals(FOLLOWER_THROTTLED_RATE)) {
            throw new AssertionError();
        }
        if (!this._overrideStaticThrottleRate && (configEntry = config.get(str)) != null && configEntry.source() == ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG && configEntry.value() != null && !configEntry.value().isEmpty()) {
            LOG.debug("Not setting {} for broker {} because pre-existing throttle of {} was already set statically.", new Object[]{str, Integer.valueOf(i), configEntry.value()});
            return;
        }
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(i));
        Object property = entityConfigs.setProperty(str, String.valueOf(this._throttleRate));
        if (property != null) {
            LOG.debug("Not setting {} for broker {} because pre-existing throttle of {} was already set", new Object[]{str, Integer.valueOf(i), property});
        } else {
            LOG.debug("Setting {} to {} bytes/second for broker {}", new Object[]{str, this._throttleRate, Integer.valueOf(i)});
            this._adminZkClient.changeBrokerConfig(Option.apply(Integer.valueOf(i)), entityConfigs);
        }
    }

    private void setLeaderThrottledReplicas(String str, Set<String> set) {
        setThrottledReplicas(str, set, LEADER_THROTTLED_REPLICAS);
    }

    private void setFollowerThrottledReplicas(String str, Set<String> set) {
        setThrottledReplicas(str, set, FOLLOWER_THROTTLED_REPLICAS);
    }

    private void setThrottledReplicas(String str, Set<String> set, String str2) {
        if (!$assertionsDisabled && !str2.equals(LEADER_THROTTLED_REPLICAS) && !str2.equals(FOLLOWER_THROTTLED_REPLICAS)) {
            throw new AssertionError();
        }
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str);
        TreeSet treeSet = new TreeSet(set);
        String property = entityConfigs.getProperty(str2);
        if (property != null && !property.isEmpty()) {
            treeSet.addAll(Arrays.asList(property.split(",")));
        }
        String join = String.join(",", treeSet);
        entityConfigs.setProperty(str2, join);
        try {
            this._adminZkClient.changeTopicConfig(str, entityConfigs);
        } catch (AdminOperationException | UnknownTopicOrPartitionException e) {
            LOG.info("Not setting throttled replicas {} for topic {} because it does not exist", join, str);
        }
    }

    static String removeReplicasFromConfig(String str, Set<String> set) {
        ArrayList arrayList = new ArrayList(Arrays.asList(str.split(",")));
        set.getClass();
        arrayList.removeIf((v1) -> {
            return r1.contains(v1);
        });
        return String.join(",", arrayList);
    }

    private void removeLeaderThrottledReplicasFromTopic(Properties properties, String str, Set<String> set) {
        String property = properties.getProperty(LEADER_THROTTLED_REPLICAS);
        if (property != null) {
            set.forEach(str2 -> {
                LOG.debug("Removing leader throttles for topic {} on replica {}", str, str2);
            });
            String removeReplicasFromConfig = removeReplicasFromConfig(property, set);
            if (removeReplicasFromConfig.isEmpty()) {
                properties.remove(LEADER_THROTTLED_REPLICAS);
            } else {
                properties.setProperty(LEADER_THROTTLED_REPLICAS, removeReplicasFromConfig);
            }
        }
    }

    private void removeFollowerThrottledReplicasFromTopic(Properties properties, String str, Set<String> set) {
        String property = properties.getProperty(FOLLOWER_THROTTLED_REPLICAS);
        if (property != null) {
            set.forEach(str2 -> {
                LOG.debug("Removing follower throttles for topic {} and replica {}", str, str2);
            });
            String removeReplicasFromConfig = removeReplicasFromConfig(property, set);
            if (removeReplicasFromConfig.isEmpty()) {
                properties.remove(FOLLOWER_THROTTLED_REPLICAS);
            } else {
                properties.setProperty(FOLLOWER_THROTTLED_REPLICAS, removeReplicasFromConfig);
            }
        }
    }

    private void removeThrottledReplicasFromTopic(String str, Set<String> set) {
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str);
        removeLeaderThrottledReplicasFromTopic(entityConfigs, str, set);
        removeFollowerThrottledReplicasFromTopic(entityConfigs, str, set);
        try {
            this._adminZkClient.changeTopicConfig(str, entityConfigs);
        } catch (AdminOperationException | UnknownTopicOrPartitionException e) {
            LOG.warn("Skip removing throttled replicas {} for topic {} due to error {}", new Object[]{set, str, e});
        }
    }

    private void removeAllThrottledReplicasFromTopic(String str) {
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str);
        Object remove = entityConfigs.remove(LEADER_THROTTLED_REPLICAS);
        Object remove2 = entityConfigs.remove(FOLLOWER_THROTTLED_REPLICAS);
        if (remove != null) {
            LOG.debug("Removing leader throttled replicas for topic {}", str);
        }
        if (remove2 != null) {
            LOG.debug("Removing follower throttled replicas for topic {}", str);
        }
        if (remove == null && remove2 == null) {
            return;
        }
        this._adminZkClient.changeTopicConfig(str, entityConfigs);
    }

    private void removeThrottledRateFromBroker(Integer num) {
        Properties entityConfigs = this._kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(num));
        Object remove = entityConfigs.remove(LEADER_THROTTLED_RATE);
        Object remove2 = entityConfigs.remove(FOLLOWER_THROTTLED_RATE);
        if (remove != null) {
            LOG.debug("Removing leader throttle on broker {}", num);
        }
        if (remove2 != null) {
            LOG.debug("Removing follower throttle on broker {}", num);
        }
        if (remove == null && remove2 == null) {
            return;
        }
        this._adminZkClient.changeBrokerConfig(Option.apply(num), entityConfigs);
    }

    public int updateOrRemoveThrottleRate(Long l) {
        int i = 0;
        for (Broker broker : JavaConverters.asJavaCollection(this._kafkaZkClient.getAllBrokersInCluster())) {
            Properties properties = new Properties();
            properties.putAll(this._kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(broker.id())));
            boolean z = false;
            boolean z2 = false;
            if (l == null) {
                z = properties.remove(KafkaConfig.LeaderReplicationThrottledRateProp()) != null;
                z2 = properties.remove(KafkaConfig.FollowerReplicationThrottledRateProp()) != null;
            } else {
                if (!properties.getProperty(KafkaConfig.LeaderReplicationThrottledRateProp(), "").equals(Long.toString(l.longValue()))) {
                    properties.setProperty(KafkaConfig.LeaderReplicationThrottledRateProp(), Long.toString(l.longValue()));
                    z = true;
                }
                if (!properties.getProperty(KafkaConfig.FollowerReplicationThrottledRateProp(), "").equals(Long.toString(l.longValue()))) {
                    properties.setProperty(KafkaConfig.FollowerReplicationThrottledRateProp(), Long.toString(l.longValue()));
                    z2 = true;
                }
            }
            if (z || z2) {
                this._kafkaZkClient.setOrCreateEntityConfigs(ConfigType.Broker(), String.valueOf(broker.id()), properties);
                i++;
            }
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeAllThrottles() {
        int i = 0;
        for (String str : JavaConverters.asJavaCollection(this._kafkaZkClient.getAllTopicsInCluster(false))) {
            Properties properties = new Properties();
            properties.putAll(this._kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str));
            boolean z = properties.remove(KafkaConfig.LeaderReplicationThrottledReplicasProp()) != null;
            boolean z2 = properties.remove(KafkaConfig.FollowerReplicationThrottledReplicasProp()) != null;
            if (z || z2) {
                this._kafkaZkClient.setOrCreateEntityConfigs(ConfigType.Topic(), str, properties);
                i++;
            }
        }
        LOG.info("Removed throttled replicas config from {} topics", Integer.valueOf(i));
        LOG.info("Removed throttle rate config from {} brokers", Integer.valueOf(updateOrRemoveThrottleRate(null)));
    }

    static {
        $assertionsDisabled = !ReplicationThrottleHelper.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(ReplicationThrottleHelper.class);
        LEADER_THROTTLED_RATE = KafkaConfig.LeaderReplicationThrottledRateProp();
        FOLLOWER_THROTTLED_RATE = KafkaConfig.FollowerReplicationThrottledRateProp();
        LEADER_THROTTLED_REPLICAS = KafkaConfig.LeaderReplicationThrottledReplicasProp();
        FOLLOWER_THROTTLED_REPLICAS = KafkaConfig.FollowerReplicationThrottledReplicasProp();
    }
}
