package com.linkedin.kafka.cruisecontrol.executor;

import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.executor.ExecutionTask;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.model.ReplicaPlacementInfo;
import com.linkedin.kafka.cruisecontrol.monitor.LoadMonitor;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import kafka.server.ConfigType;
import kafka.server.KafkaConfig;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.admin.ConfigEntry;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigResource;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Option;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/executor/ReplicationThrottleHelperTest.class */
public class ReplicationThrottleHelperTest extends CCKafkaIntegrationTestHarness {
    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    public int clusterSize() {
        return 4;
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
    }

    private void mockDescribeConfigs(KafkaAdminClient kafkaAdminClient, List<Integer> list) throws ExecutionException, InterruptedException {
        Collection<ConfigResource> configResourcesForBrokers = KafkaCruiseControlUnitTestUtils.configResourcesForBrokers(list);
        HashMap hashMap = new HashMap();
        Iterator<Integer> it = list.iterator();
        while (it.hasNext()) {
            hashMap.put(Integer.toString(it.next().intValue()), Collections.emptyList());
        }
        KafkaCruiseControlUnitTestUtils.mockDescribeConfigs(kafkaAdminClient, configResourcesForBrokers, hashMap);
    }

    private void createTopics() {
        ConfluentAdmin createAdmin = KafkaCruiseControlUtils.createAdmin(Collections.singletonMap("bootstrap.servers", broker(0).plaintextAddr()));
        try {
            createAdmin.createTopics(Arrays.asList(new NewTopic(TestConstants.TOPIC0, 2, (short) 2), new NewTopic(TestConstants.TOPIC1, 2, (short) 2)));
        } finally {
            KafkaCruiseControlUtils.closeAdminClientWithTimeout(createAdmin);
        }
    }

    private ExecutionTask inProgressTaskForProposal(long j, ExecutionProposal executionProposal) {
        ExecutionTask executionTask = new ExecutionTask(j, executionProposal, ExecutionTask.TaskType.INTER_BROKER_REPLICA_ACTION);
        executionTask.inProgress(0L);
        return executionTask;
    }

    private ExecutionTask completedTaskForProposal(long j, ExecutionProposal executionProposal) {
        ExecutionTask inProgressTaskForProposal = inProgressTaskForProposal(j, executionProposal);
        inProgressTaskForProposal.completed(1L);
        return inProgressTaskForProposal;
    }

    @Test
    public void isNoOpWhenThrottleIsNull() throws InterruptedException, ExecutionException {
        KafkaZkClient kafkaZkClient = (KafkaZkClient) EasyMock.strictMock(KafkaZkClient.class);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        KafkaCruiseControlUnitTestUtils.mockDescribeConfigs(kafkaAdminClient, Collections.emptyList(), Collections.emptyMap());
        EasyMock.replay(new Object[]{kafkaZkClient, kafkaAdminClient});
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(kafkaZkClient, kafkaAdminClient, (Long) null, true);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition("topic", 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList());
    }

    private void assertExpectedThrottledRateForBroker(KafkaZkClient kafkaZkClient, int i, Long l) {
        Properties entityConfigs = kafkaZkClient.getEntityConfigs(ConfigType.Broker(), String.valueOf(i));
        String valueOf = l == null ? null : String.valueOf(l);
        Assert.assertEquals(valueOf, entityConfigs.getProperty(ReplicationThrottleHelper.LEADER_THROTTLED_RATE));
        Assert.assertEquals(valueOf, entityConfigs.getProperty(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE));
    }

    private void assertExpectedThrottledReplicas(KafkaZkClient kafkaZkClient, String str, String str2) {
        Properties entityConfigs = kafkaZkClient.getEntityConfigs(ConfigType.Topic(), str);
        Assert.assertEquals(str2, entityConfigs.getProperty(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS));
        Assert.assertEquals(str2, entityConfigs.getProperty(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS));
    }

    @Test
    public void addingThrottlesWithNoPreExistingThrottles() throws InterruptedException, ExecutionException {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "AddingThrottlesWithNoPreExistingThrottles", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        List<Integer> asList = Arrays.asList(0, 1, 2);
        mockDescribeConfigs(kafkaAdminClient, asList);
        EasyMock.replay(new Object[]{kafkaAdminClient});
        try {
            ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
            ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
            ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
            replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
            Iterator<Integer> it = asList.iterator();
            while (it.hasNext()) {
                assertExpectedThrottledRateForBroker(createKafkaZkClient, it.next().intValue(), 100L);
            }
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, null);
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "0:0,0:1,0:2");
            replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList());
            Arrays.asList(0, 1, 2, 3).forEach(num -> {
                assertExpectedThrottledRateForBroker(createKafkaZkClient, num.intValue(), null);
            });
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, null);
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void testExceptionInAdminClientStopsThrottling() {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "AddingThrottlesWithNoPreExistingThrottles", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        List asList = Arrays.asList(0, 1, 2);
        EasyMock.expect(kafkaAdminClient.describeConfigs(KafkaCruiseControlUnitTestUtils.configResourcesForBrokers(asList))).andThrow(new KafkaException("!!!")).times(1);
        EasyMock.replay(new Object[]{kafkaAdminClient});
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
        Assert.assertThrows(RuntimeException.class, () -> {
            replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        });
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            assertExpectedThrottledRateForBroker(createKafkaZkClient, ((Integer) it.next()).intValue(), null);
        }
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, null);
        assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, null);
    }

    @Test
    public void testNoThrottledReplicasAreSetIfAllBrokersHaveStaticThrottledReplicas() throws InterruptedException, ExecutionException {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "AddingThrottlesWithNoPreExistingThrottles", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        List asList = Arrays.asList(0, 1, 2);
        List singletonList = Collections.singletonList(3);
        HashMap hashMap = new HashMap();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            hashMap.put(((Integer) it.next()).toString(), Arrays.asList(new ConfigEntry(KafkaConfig.FollowerReplicationThrottledReplicasProp(), "*"), new ConfigEntry(KafkaConfig.LeaderReplicationThrottledReplicasProp(), "*")));
        }
        Iterator it2 = singletonList.iterator();
        while (it2.hasNext()) {
            hashMap.put(((Integer) it2.next()).toString(), Arrays.asList(new ConfigEntry(KafkaConfig.FollowerReplicationThrottledReplicasProp(), (String) null), new ConfigEntry(KafkaConfig.LeaderReplicationThrottledReplicasProp(), (String) null)));
        }
        KafkaCruiseControlUnitTestUtils.mockDescribeConfigs(kafkaAdminClient, KafkaCruiseControlUnitTestUtils.configResourcesForBrokers(asList), hashMap);
        EasyMock.replay(new Object[]{kafkaAdminClient});
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        Iterator it3 = asList.iterator();
        while (it3.hasNext()) {
            assertExpectedThrottledRateForBroker(createKafkaZkClient, ((Integer) it3.next()).intValue(), 100L);
        }
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, null);
        assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, null);
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(createKafkaZkClient, num.intValue(), null);
        });
        assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, null);
    }

    @Test
    public void testRemovedBrokersNotDescribed() throws InterruptedException, ExecutionException {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "AddingThrottlesWithNoPreExistingThrottles", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        List<Integer> asList = Arrays.asList(0, 2);
        List asList2 = Arrays.asList(0, 1, 2);
        mockDescribeConfigs(kafkaAdminClient, asList);
        EasyMock.replay(new Object[]{kafkaAdminClient});
        try {
            ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
            ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
            ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
            replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.singleton(1));
            Iterator it = asList2.iterator();
            while (it.hasNext()) {
                assertExpectedThrottledRateForBroker(createKafkaZkClient, ((Integer) it.next()).intValue(), 100L);
            }
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, null);
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "0:0,0:1,0:2");
            replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList());
            Arrays.asList(0, 1, 2, 3).forEach(num -> {
                assertExpectedThrottledRateForBroker(createKafkaZkClient, num.intValue(), null);
            });
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, null);
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    private ConfigEntry configEntryMock(String str, String str2, ConfigEntry.ConfigSource configSource) {
        ConfigEntry configEntry = (ConfigEntry) EasyMock.mock(ConfigEntry.class);
        EasyMock.expect(configEntry.name()).andReturn(str).anyTimes();
        EasyMock.expect(configEntry.value()).andReturn(str2).anyTimes();
        EasyMock.expect(configEntry.source()).andReturn(configSource).anyTimes();
        EasyMock.replay(new Object[]{configEntry});
        return configEntry;
    }

    @Test
    public void testNoThrottledRateIsSetForBrokersThatHaveStaticThrottleRateSet() throws InterruptedException, ExecutionException {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "AddingThrottlesWithNoPreExistingThrottles", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        List asList = Arrays.asList(0, 1, 2);
        List singletonList = Collections.singletonList(3);
        HashMap hashMap = new HashMap();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            hashMap.put(((Integer) it.next()).toString(), Arrays.asList(configEntryMock(ReplicationThrottleHelper.LEADER_THROTTLED_RATE, Long.toString(300L), ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG), configEntryMock(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE, Long.toString(300L), ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG)));
        }
        Iterator it2 = singletonList.iterator();
        while (it2.hasNext()) {
            hashMap.put(((Integer) it2.next()).toString(), Arrays.asList(new ConfigEntry(ReplicationThrottleHelper.LEADER_THROTTLED_RATE, (String) null), new ConfigEntry(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE, (String) null)));
        }
        KafkaCruiseControlUnitTestUtils.mockDescribeConfigs(kafkaAdminClient, KafkaCruiseControlUnitTestUtils.configResourcesForBrokers(asList), hashMap);
        EasyMock.replay(new Object[]{kafkaAdminClient});
        new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, false).setThrottles(Collections.singletonList(new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList())), (LoadMonitor) null, Collections.emptySet());
        Iterator it3 = asList.iterator();
        while (it3.hasNext()) {
            assertExpectedThrottledRateForBroker(createKafkaZkClient, ((Integer) it3.next()).intValue(), null);
        }
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, null);
        assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "0:0,0:1,0:2");
    }

    @Test
    public void testTopicThrottlesAreSetIfNotAllBrokersHaveStaticThrottles() throws InterruptedException, ExecutionException {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "TopicThrottlesAreSetIfNotAllBrokersHaveStaticThrottles", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        List asList = Arrays.asList(0, 1);
        List asList2 = Arrays.asList(2, 3);
        HashMap hashMap = new HashMap();
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            hashMap.put(((Integer) it.next()).toString(), Arrays.asList(new ConfigEntry(KafkaConfig.FollowerReplicationThrottledReplicasProp(), "*"), new ConfigEntry(KafkaConfig.LeaderReplicationThrottledReplicasProp(), "*")));
        }
        Iterator it2 = asList2.iterator();
        while (it2.hasNext()) {
            hashMap.put(((Integer) it2.next()).toString(), Arrays.asList(new ConfigEntry(KafkaConfig.FollowerReplicationThrottledReplicasProp(), (String) null), new ConfigEntry(KafkaConfig.LeaderReplicationThrottledReplicasProp(), (String) null)));
        }
        KafkaCruiseControlUnitTestUtils.mockDescribeConfigs(kafkaAdminClient, KafkaCruiseControlUnitTestUtils.configResourcesForBrokers(Arrays.asList(0, 1, 2)), hashMap);
        EasyMock.replay(new Object[]{kafkaAdminClient});
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
        replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
        Iterator it3 = asList.iterator();
        while (it3.hasNext()) {
            assertExpectedThrottledRateForBroker(createKafkaZkClient, ((Integer) it3.next()).intValue(), 100L);
        }
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 2, 100L);
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, null);
        assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "0:0,0:1,0:2");
        replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList());
        Arrays.asList(0, 1, 2, 3).forEach(num -> {
            assertExpectedThrottledRateForBroker(createKafkaZkClient, num.intValue(), null);
        });
        assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, null);
    }

    @Test
    public void addingThrottlesWithPreExistingThrottles() throws InterruptedException, ExecutionException {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "addingThrottlesWithPreExistingThrottles", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        mockDescribeConfigs(kafkaAdminClient, Arrays.asList(0, 1, 2));
        EasyMock.replay(new Object[]{kafkaAdminClient});
        try {
            ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
            ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
            ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
            Properties properties = new Properties();
            properties.setProperty(ReplicationThrottleHelper.LEADER_THROTTLED_RATE, String.valueOf(200L));
            properties.setProperty(ReplicationThrottleHelper.FOLLOWER_THROTTLED_RATE, String.valueOf(200L));
            new AdminZkClient(createKafkaZkClient).changeBrokerConfig(Option.apply(0), properties);
            Properties entityConfigs = createKafkaZkClient.getEntityConfigs(ConfigType.Topic(), TestConstants.TOPIC0);
            entityConfigs.setProperty(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, "1:0,1:1");
            entityConfigs.setProperty(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "1:0,1:1");
            new AdminZkClient(createKafkaZkClient).changeTopicConfig(TestConstants.TOPIC0, entityConfigs);
            Properties entityConfigs2 = createKafkaZkClient.getEntityConfigs(ConfigType.Topic(), TestConstants.TOPIC1);
            entityConfigs2.setProperty(ReplicationThrottleHelper.LEADER_THROTTLED_REPLICAS, "1:1");
            entityConfigs2.setProperty(ReplicationThrottleHelper.FOLLOWER_THROTTLED_REPLICAS, "1:1");
            new AdminZkClient(createKafkaZkClient).changeTopicConfig(TestConstants.TOPIC1, entityConfigs2);
            replicationThrottleHelper.setThrottles(Collections.singletonList(executionProposal), (LoadMonitor) null, Collections.emptySet());
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 0, 200L);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 1, 100L);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 2, 100L);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, null);
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "0:0,0:1,0:2,1:0,1:1");
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC1, "1:1");
            replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList());
            replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.emptyList());
            Arrays.asList(0, 1, 2, 3).forEach(num -> {
                assertExpectedThrottledRateForBroker(createKafkaZkClient, num.intValue(), null);
            });
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "1:0,1:1");
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC1, "1:1");
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void doNoteRemoveThrottlesForInProgressTasks() throws InterruptedException, ExecutionException {
        createTopics();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "doNoteRemoveThrottlesForInProgressTasks", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        List<Integer> asList = Arrays.asList(0, 1, 2, 3);
        mockDescribeConfigs(kafkaAdminClient, asList);
        EasyMock.replay(new Object[]{kafkaAdminClient});
        try {
            ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
            ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
            ExecutionProposal executionProposal2 = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 1), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
            replicationThrottleHelper.setThrottles(Arrays.asList(executionProposal, executionProposal2), (LoadMonitor) null, Collections.emptySet());
            ExecutionTask completedTaskForProposal = completedTaskForProposal(0L, executionProposal);
            ExecutionTask inProgressTaskForProposal = inProgressTaskForProposal(1L, executionProposal2);
            Iterator<Integer> it = asList.iterator();
            while (it.hasNext()) {
                assertExpectedThrottledRateForBroker(createKafkaZkClient, it.next().intValue(), 100L);
            }
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "0:0,0:1,0:2,1:0,1:2,1:3");
            replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.singletonList(inProgressTaskForProposal));
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 0, 100L);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 1, null);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 2, 100L);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, 100L);
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "1:0,1:2,1:3");
            replicationThrottleHelper.clearThrottles(Collections.singletonList(completedTaskForProposal), Collections.singletonList(inProgressTaskForProposal));
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 0, 100L);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 1, null);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 2, 100L);
            assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, 100L);
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, "1:0,1:2,1:3");
            inProgressTaskForProposal.completed(3L);
            replicationThrottleHelper.clearThrottles(Arrays.asList(completedTaskForProposal, inProgressTaskForProposal), Collections.emptyList());
            Arrays.asList(0, 1, 2, 3).forEach(num -> {
                assertExpectedThrottledRateForBroker(createKafkaZkClient, num.intValue(), null);
            });
            assertExpectedThrottledReplicas(createKafkaZkClient, TestConstants.TOPIC0, null);
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void removeReplicasFromConfigTest() {
        LinkedHashSet linkedHashSet = new LinkedHashSet();
        linkedHashSet.add("foo");
        linkedHashSet.add("bar");
        linkedHashSet.add("baz");
        Assert.assertEquals(ReplicationThrottleHelper.removeReplicasFromConfig("foo,bar,qux,qaz,baz", linkedHashSet), "qux,qaz");
    }

    @Test
    public void testDoesNotSetThrottleRate() throws ExecutionException, InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "DoesNotSetThrottleRate", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        mockDescribeConfigs(kafkaAdminClient, Arrays.asList(0, 1, 2, 3));
        EasyMock.replay(new Object[]{kafkaAdminClient});
        try {
            ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, (Long) null, true);
            replicationThrottleHelper.setThrottleRate((Long) null);
            Assert.assertNull("Expected throttle rate to be set to null", replicationThrottleHelper._throttleRate);
            Assert.assertFalse("Auto throttling should not be on with a null throttle", replicationThrottleHelper._autoThrottleEnabled);
            ReplicationThrottleHelper replicationThrottleHelper2 = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 101L, true);
            replicationThrottleHelper2.setThrottleRate(100L);
            Assert.assertEquals(100L, replicationThrottleHelper2._throttleRate.longValue());
            Assert.assertFalse("Auto throttling should not be on", replicationThrottleHelper2._autoThrottleEnabled);
            ReplicationThrottleHelper replicationThrottleHelper3 = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
            replicationThrottleHelper3.setThrottleRate((Long) null);
            Assert.assertNull("Expected throttle rate to be set to null", replicationThrottleHelper3._throttleRate);
            Assert.assertFalse("Auto throttling should not be on", replicationThrottleHelper3._autoThrottleEnabled);
            ReplicationThrottleHelper replicationThrottleHelper4 = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
            replicationThrottleHelper4.setThrottleRate(101L);
            Assert.assertEquals(101L, replicationThrottleHelper4._throttleRate.longValue());
            Assert.assertFalse("Auto throttling should not be on", replicationThrottleHelper4._autoThrottleEnabled);
            replicationThrottleHelper4.setThrottleRate(100L);
            Assert.assertEquals(100L, replicationThrottleHelper4._throttleRate.longValue());
            Assert.assertFalse("Auto throttling should not be on", replicationThrottleHelper4._autoThrottleEnabled);
            ReplicationThrottleHelper replicationThrottleHelper5 = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, 100L, true);
            Assert.assertFalse("Auto throttling should not be on", replicationThrottleHelper5._autoThrottleEnabled);
            replicationThrottleHelper5.setThrottleRate(Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE));
            Assert.assertTrue("Auto throttling should be on since we set it", replicationThrottleHelper5._autoThrottleEnabled);
            replicationThrottleHelper5.setThrottleRate(1L);
            Assert.assertFalse("Auto throttling should be on since we set it to 1", replicationThrottleHelper5._autoThrottleEnabled);
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        } catch (Throwable th) {
            KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
            throw th;
        }
    }

    @Test
    public void testAutomaticThrottleComputation() throws ExecutionException, InterruptedException {
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "ReplicationThrottleHelperTestMetricGroup", "testAutomaticThrottleComputation", false);
        KafkaAdminClient kafkaAdminClient = (KafkaAdminClient) EasyMock.mock(KafkaAdminClient.class);
        mockDescribeConfigs(kafkaAdminClient, Arrays.asList(0, 1, 2, 3));
        EasyMock.replay(new Object[]{kafkaAdminClient});
        LoadMonitor loadMonitor = (LoadMonitor) EasyMock.createMock(LoadMonitor.class);
        EasyMock.expect(Long.valueOf(loadMonitor.computeThrottle())).andReturn(50000000L).once();
        EasyMock.expect(Long.valueOf(loadMonitor.computeThrottle())).andReturn(30000000L).once();
        EasyMock.replay(new Object[]{loadMonitor});
        ReplicationThrottleHelper replicationThrottleHelper = new ReplicationThrottleHelper(createKafkaZkClient, kafkaAdminClient, Long.valueOf(KafkaCruiseControlConfig.AUTO_THROTTLE), true);
        ExecutionProposal executionProposal = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 0), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(1)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        ExecutionProposal executionProposal2 = new ExecutionProposal(new TopicPartition(TestConstants.TOPIC0, 1), 100L, new ReplicaPlacementInfo(0), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(3)), Arrays.asList(new ReplicaPlacementInfo(0), new ReplicaPlacementInfo(2)), Collections.emptyList(), Collections.emptyList());
        replicationThrottleHelper.setThrottles(Arrays.asList(executionProposal, executionProposal2), loadMonitor, Collections.emptySet());
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 0, 50000000L);
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 1, 50000000L);
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 2, 50000000L);
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, 50000000L);
        replicationThrottleHelper.removeAllThrottles();
        replicationThrottleHelper.setThrottles(Arrays.asList(executionProposal, executionProposal2), loadMonitor, Collections.emptySet());
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 0, 30000000L);
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 1, 30000000L);
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 2, 30000000L);
        assertExpectedThrottledRateForBroker(createKafkaZkClient, 3, 30000000L);
        EasyMock.verify(new Object[]{loadMonitor});
    }
}
