package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import io.confluent.databalancer.TestConstants;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.admin.AlterConfigOp;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeConfigsOptions;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

@Tag(TestConstants.INTEGRATION_TEST)
/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.class */
public class ReplicationThrottleHelperTest extends CCKafkaIntegrationTestHarness {
    ConfluentAdmin admin;
    Properties props;
    KafkaCruiseControlConfig cconfig;

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    public int clusterSize() {
        return 4;
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @BeforeEach
    public void setUp() {
        super.setUp();
        this.admin = KafkaCruiseControlUtils.createAdmin(Collections.singletonMap("bootstrap.servers", bootstrapServers()));
        this.admin.createTopics(Arrays.asList(new NewTopic(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 2, (short) 2), new NewTopic(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC1, 2, (short) 2)));
        this.props = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @AfterEach
    public void tearDown() {
        KafkaCruiseControlUtils.closeAdminClientWithTimeout(this.admin);
        super.tearDown();
    }

    private ExecutionTask inProgressTaskForProposal(long j, ExecutionProposal executionProposal) {
        ExecutionTask executionTask = new ExecutionTask(j, executionProposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
        executionTask.inProgress(0L);
        return executionTask;
    }

    private ExecutionTask completedTaskForProposal(long j, ExecutionProposal executionProposal) {
        ExecutionTask inProgressTaskForProposal = inProgressTaskForProposal(j, executionProposal);
        inProgressTaskForProposal.completed(1L);
        return inProgressTaskForProposal;
    }

    @Test
    public void isNoOpWhenThrottleIsNull() throws InterruptedException {
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.spy(this.admin);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(confluentAdmin, this.cconfig);
        replicationThrottleHelper.setThrottleRate((Long) null);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(ExecutorTestUtils.TOPIC, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        ((ConfluentAdmin) Mockito.verify(confluentAdmin, Mockito.never())).describeConfigs(Mockito.anyCollection(), (DescribeConfigsOptions) Mockito.any());
        ((ConfluentAdmin) Mockito.verify(confluentAdmin, Mockito.never())).incrementalAlterConfigs((Map) Mockito.any());
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal(0L, executionProposal)), Collections.emptyList(), Collections.emptySet());
        ((ConfluentAdmin) Mockito.verify(confluentAdmin, Mockito.never())).describeConfigs(Mockito.anyCollection(), (DescribeConfigsOptions) Mockito.any());
        ((ConfluentAdmin) Mockito.verify(confluentAdmin, Mockito.never())).incrementalAlterConfigs((Map) Mockito.any());
    }

    private void assertExpectedThrottledRateForBroker(int i, Long l) {
        Optional<String> empty = l == null ? Optional.empty() : Optional.of(String.valueOf(l));
        waitForBrokerThrottle(this.admin, i, empty);
        Config entityConfigs = KafkaCruiseControlUtils.getEntityConfigs(this.admin, ConfigResource.Type.BROKER, String.valueOf(i));
        Assertions.assertEquals(empty, KafkaCruiseControlUtils.getConfigEntry(entityConfigs, ReplicationThrottleHelper.LEADER_THROTTLED_RATE).filter(configEntry -> {
            return !configEntry.isDefault();
        }).map((v0) -> {
            return v0.value();
        }), "Broker Id: " + i);
        Assertions.assertEquals(empty, KafkaCruiseControlUtils.getConfigEntry(entityConfigs, ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE).filter(configEntry2 -> {
            return !configEntry2.isDefault();
        }).map((v0) -> {
            return v0.value();
        }), "Broker Id: " + i);
    }

    private void assertExpectedThrottledReplicas(String str, String str2) {
        Config entityConfigs = KafkaCruiseControlUtils.getEntityConfigs(this.admin, ConfigResource.Type.TOPIC, str);
        Assertions.assertEquals(Optional.ofNullable(str2), KafkaCruiseControlUtils.getConfigEntry(entityConfigs, ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS).filter(configEntry -> {
            return !configEntry.isDefault();
        }).map((v0) -> {
            return v0.value();
        }), "Topic: " + str);
        Assertions.assertEquals(Optional.ofNullable(str2), KafkaCruiseControlUtils.getConfigEntry(entityConfigs, ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS).filter(configEntry2 -> {
            return !configEntry2.isDefault();
        }).map((v0) -> {
            return v0.value();
        }), "Topic: " + str);
    }

    @Test
    public void addingThrottlesWithNoPreExistingThrottles() throws InterruptedException {
        List asList = Arrays.asList(0, 1, 2);
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), 100L);
        }
        assertExpectedThrottledRateForBroker(3, null);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "0:0,0:1,0:2");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList(), Collections.emptySet());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(num.intValue(), null);
        });
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
    }

    @Test
    public void testExceptionInAdminClientStopsThrottling() {
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) Mockito.spy(this.admin);
        List asList = Arrays.asList(0, 1, 2);
        Mockito.when(confluentAdmin.describeConfigs(Mockito.anyCollection(), (DescribeConfigsOptions) Mockito.any())).thenThrow(new Throwable[]{new KafkaException("!!!")}).thenCallRealMethod();
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(confluentAdmin, this.cconfig);
        Assertions.assertThrows(RuntimeException.class, () -> {
            replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        });
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), null);
        }
        assertExpectedThrottledRateForBroker(3, null);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
    }

    @Test
    public void testNoThrottledReplicasAreSetIfAllBrokersHaveStaticThrottledReplicas() throws InterruptedException {
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        List asList = Arrays.asList(0, 1, 2);
        List list = (List) asList.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "*");
        hashMap.put(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, "*");
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.BROKER, list, AlterConfigOp.OpType.SET, hashMap);
        List list2 = (List) Collections.singletonList(3).stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, null);
        hashMap2.put(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, null);
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.BROKER, list2, AlterConfigOp.OpType.DELETE, hashMap2);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), 100L);
        }
        assertExpectedThrottledRateForBroker(3, null);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList(), Collections.emptySet());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(num.intValue(), null);
        });
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
    }

    @Test
    public void testRemovedBrokersNotDescribed() throws InterruptedException {
        List asList = Arrays.asList(0, 2);
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.singleton(1));
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), 100L);
        }
        assertExpectedThrottledRateForBroker(3, null);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "0:0,0:1,0:2");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList(), Collections.emptySet());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(num.intValue(), null);
        });
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
    }

    @Test
    public void testNoThrottledRateIsSetForBrokersThatHaveStaticThrottleRateSet() throws InterruptedException {
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", false);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        List asList = Arrays.asList(0, 1, 2);
        List list = (List) asList.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(ReplicationThrottleHelper.LEADER_THROTTLED_RATE, Long.toString(300L));
        hashMap.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE, Long.toString(300L));
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.BROKER, list, AlterConfigOp.OpType.SET, hashMap);
        List list2 = (List) Collections.singletonList(3).stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(ReplicationThrottleHelper.LEADER_THROTTLED_RATE, null);
        hashMap2.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE, null);
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.BROKER, list2, AlterConfigOp.OpType.DELETE, hashMap2);
        replicationThrottleHelper.setThrottles(Collections.singletonList(new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList())), (LoadMonitor) null, Collections.emptySet());
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), 300L);
        }
        assertExpectedThrottledRateForBroker(3, null);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "0:0,0:1,0:2");
    }

    @Test
    public void testTopicThrottlesAreSetIfNotAllBrokersHaveStaticThrottles() throws InterruptedException {
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        List asList = Arrays.asList(0, 1);
        List list = (List) asList.stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList());
        HashMap hashMap = new HashMap();
        hashMap.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "*");
        hashMap.put(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, "*");
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.BROKER, list, AlterConfigOp.OpType.SET, hashMap);
        List list2 = (List) Arrays.asList(2, 3).stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList());
        HashMap hashMap2 = new HashMap();
        hashMap2.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, null);
        hashMap2.put(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, null);
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.BROKER, list2, AlterConfigOp.OpType.DELETE, hashMap2);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), 100L);
        }
        assertExpectedThrottledRateForBroker(2, 100L);
        assertExpectedThrottledRateForBroker(3, null);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "0:0,0:1,0:2");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList(), Collections.emptySet());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(num.intValue(), null);
        });
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
    }

    @Test
    public void addingThrottlesWithPreExistingThrottles() throws InterruptedException {
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        HashMap hashMap = new HashMap();
        hashMap.put(ReplicationThrottleHelper.LEADER_THROTTLED_RATE, String.valueOf(200L));
        hashMap.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE, String.valueOf(200L));
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.BROKER, Collections.singleton("0"), AlterConfigOp.OpType.SET, hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, "1:0,1:1");
        hashMap2.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "1:0,1:1");
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.TOPIC, Collections.singleton(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0), AlterConfigOp.OpType.SET, hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, "1:1");
        hashMap3.put(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "1:1");
        KafkaCruiseControlUtils.setEntityConfigs(this.admin, ConfigResource.Type.TOPIC, Collections.singleton(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC1), AlterConfigOp.OpType.SET, hashMap3);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        assertExpectedThrottledRateForBroker(0, 200L);
        assertExpectedThrottledRateForBroker(1, 100L);
        assertExpectedThrottledRateForBroker(2, 100L);
        assertExpectedThrottledRateForBroker(3, null);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "1:0,1:1,0:0,0:1,0:2");
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC1, "1:1");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList(), Collections.emptySet());
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList(), Collections.emptySet());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(num.intValue(), null);
        });
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "1:0,1:1");
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC1, "1:1");
    }

    @Test
    public void testRemovesThrottlesForTasksToBeRetried() throws InterruptedException {
        List asList = Arrays.asList(0, 1, 2, 3);
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionProposal executionProposal2 = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 1), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        replicationThrottleHelper.setThrottles(Arrays.asList(executionProposal, executionProposal2), (LoadMonitor) null, Collections.emptySet());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        ExecutionTask inProgressTaskForProposal = inProgressTaskForProposal(1L, executionProposal2);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), 100L);
        }
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "0:0,0:1,0:2,1:0,1:2,1:3");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.singletonList(inProgressTaskForProposal), Collections.emptySet());
        assertExpectedThrottledRateForBroker(0, 100L);
        assertExpectedThrottledRateForBroker(1, null);
        assertExpectedThrottledRateForBroker(2, 100L);
        assertExpectedThrottledRateForBroker(3, 100L);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "1:0,1:2,1:3");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.singletonList(inProgressTaskForProposal), Collections.emptySet());
        assertExpectedThrottledRateForBroker(0, 100L);
        assertExpectedThrottledRateForBroker(1, null);
        assertExpectedThrottledRateForBroker(2, 100L);
        assertExpectedThrottledRateForBroker(3, 100L);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "1:0,1:2,1:3");
        inProgressTaskForProposal.toBeRetried();
        replicationThrottleHelper.clearThrottles(Arrays.asList(completedTaskForProposal, inProgressTaskForProposal), Collections.emptyList(), Collections.emptySet());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(num.intValue(), null);
        });
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
    }

    @Test
    public void doNotRemoveThrottlesForInProgressTasks() throws InterruptedException {
        List asList = Arrays.asList(0, 1, 2, 3);
        this.props.put("throttle.bytes.per.second", 100L);
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionProposal executionProposal2 = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 1), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        replicationThrottleHelper.setThrottles(Arrays.asList(executionProposal, executionProposal2), (LoadMonitor) null, Collections.emptySet());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        ExecutionTask inProgressTaskForProposal = inProgressTaskForProposal(1L, executionProposal2);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(((Integer) it.next()).intValue(), 100L);
        }
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "0:0,0:1,0:2,1:0,1:2,1:3");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.singletonList(inProgressTaskForProposal), Collections.emptySet());
        assertExpectedThrottledRateForBroker(0, 100L);
        assertExpectedThrottledRateForBroker(1, null);
        assertExpectedThrottledRateForBroker(2, 100L);
        assertExpectedThrottledRateForBroker(3, 100L);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "1:0,1:2,1:3");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.singletonList(inProgressTaskForProposal), Collections.emptySet());
        assertExpectedThrottledRateForBroker(0, 100L);
        assertExpectedThrottledRateForBroker(1, null);
        assertExpectedThrottledRateForBroker(2, 100L);
        assertExpectedThrottledRateForBroker(3, 100L);
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, "1:0,1:2,1:3");
        inProgressTaskForProposal.completed(3L);
        replicationThrottleHelper.clearThrottles(Arrays.asList(completedTaskForProposal, inProgressTaskForProposal), Collections.emptyList(), Collections.emptySet());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(num.intValue(), null);
        });
        assertExpectedThrottledReplicas(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, null);
    }

    @Test
    public void removeReplicasFromConfigTest() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.add("foo");
        linkedHashSet.add("bar");
        linkedHashSet.add("baz");
        Assertions.assertEquals(ReplicationThrottleHelper.removeReplicasFromConfig("foo,bar,qux,qaz,baz", linkedHashSet), "qux,qaz");
    }

    @Test
    public void testDoesNotSetThrottleRate() {
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        replicationThrottleHelper.setThrottleRate((Long) null);
        Assertions.assertNull(replicationThrottleHelper.throttleRate, "Expected throttle rate to be set to null");
        Assertions.assertFalse(replicationThrottleHelper.autoThrottleEnabled, "Auto throttling should not be on with a null throttle");
        this.props.put("throttle.bytes.per.second", 101L);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper2 = new ReplicationThrottleHelper(this.admin, this.cconfig);
        replicationThrottleHelper2.setThrottleRate(100L);
        Assertions.assertEquals(100L, replicationThrottleHelper2.throttleRate.longValue());
        Assertions.assertFalse(replicationThrottleHelper2.autoThrottleEnabled, "Auto throttling should not be on");
        this.props.put("throttle.bytes.per.second", 100L);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper3 = new ReplicationThrottleHelper(this.admin, this.cconfig);
        replicationThrottleHelper3.setThrottleRate((Long) null);
        Assertions.assertNull(replicationThrottleHelper3.throttleRate, "Expected throttle rate to be set to null");
        Assertions.assertFalse(replicationThrottleHelper3.autoThrottleEnabled, "Auto throttling should not be on");
        ReplicationThrottleHelper replicationThrottleHelper4 = new ReplicationThrottleHelper(this.admin, this.cconfig);
        replicationThrottleHelper4.setThrottleRate(101L);
        Assertions.assertEquals(101L, replicationThrottleHelper4.throttleRate.longValue());
        Assertions.assertFalse(replicationThrottleHelper4.autoThrottleEnabled, "Auto throttling should not be on");
        replicationThrottleHelper4.setThrottleRate(100L);
        Assertions.assertEquals(100L, replicationThrottleHelper4.throttleRate.longValue());
        Assertions.assertFalse(replicationThrottleHelper4.autoThrottleEnabled, "Auto throttling should not be on");
        ReplicationThrottleHelper replicationThrottleHelper5 = new ReplicationThrottleHelper(this.admin, this.cconfig);
        Assertions.assertFalse(replicationThrottleHelper5.autoThrottleEnabled, "Auto throttling should not be on");
        replicationThrottleHelper5.setThrottleRate(Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE));
        Assertions.assertTrue(replicationThrottleHelper5.autoThrottleEnabled, "Auto throttling should be on since we set it");
        replicationThrottleHelper5.setThrottleRate(1L);
        Assertions.assertFalse(replicationThrottleHelper5.autoThrottleEnabled, "Auto throttling should be on since we set it to 1");
    }

    @Test
    public void testAutomaticThrottleComputation() throws InterruptedException {
        LoadMonitor loadMonitor = (LoadMonitor) EasyMock.createMock(LoadMonitor.class);
        EasyMock.expect(Long.valueOf(loadMonitor.computeThrottle())).andReturn(50000000L).once();
        EasyMock.expect(Long.valueOf(loadMonitor.computeThrottle())).andReturn(30000000L).once();
        EasyMock.replay(new Object[]{loadMonitor});
        this.props.put("throttle.bytes.per.second", Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE));
        this.props.put("static.throttle.rate.override.enabled", true);
        this.cconfig = new KafkaCruiseControlConfig(this.props);
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(this.admin, this.cconfig);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionProposal executionProposal2 = new ExecutionProposal(new TopicPartition(com.linkedin.kafka.cruisecontrol.common.TestConstants.TOPIC0, 1), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        replicationThrottleHelper.setThrottles(Arrays.asList(executionProposal, executionProposal2), loadMonitor, Collections.emptySet());
        assertExpectedThrottledRateForBroker(0, 50000000L);
        assertExpectedThrottledRateForBroker(1, 50000000L);
        assertExpectedThrottledRateForBroker(2, 50000000L);
        assertExpectedThrottledRateForBroker(3, 50000000L);
        replicationThrottleHelper.removeAllThrottles();
        List list = (List) this.brokers.keySet().stream().map((v0) -> {
            return String.valueOf(v0);
        }).collect(Collectors.toList());
        TestUtils.waitForCondition(() -> {
            return KafkaCruiseControlUtils.getEntityConfigs(this.admin, ConfigResource.Type.BROKER, list).values().stream().map(config -> {
                return KafkaCruiseControlUtils.getConfigEntry(config, ReplicationThrottleHelper.LEADER_THROTTLED_RATE);
            }).filter(optional -> {
                return !optional.isPresent() || ((ConfigEntry) optional.get()).isDefault();
            }).count() == ((long) list.size());
        }, 10000L, "Waiting for throttles to be removed.");
        replicationThrottleHelper.setThrottles(Arrays.asList(executionProposal, executionProposal2), loadMonitor, Collections.emptySet());
        assertExpectedThrottledRateForBroker(0, 30000000L);
        assertExpectedThrottledRateForBroker(1, 30000000L);
        assertExpectedThrottledRateForBroker(2, 30000000L);
        assertExpectedThrottledRateForBroker(3, 30000000L);
        EasyMock.verify(new Object[]{loadMonitor});
    }
}
