package com.linkedin.kafka.cruisecontrol.monitor;

import com.linkedin.cruisecontrol.CruiseControlUnitTestUtils;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigFileResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.model.ClusterModel;
import com.linkedin.kafka.cruisecontrol.model.ModelParameters;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.NoopSampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.BrokerEntity;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionEntity;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import com.linkedin.kafka.cruisecontrol.monitor.task.LoadMonitorTaskRunner;
import com.yammer.metrics.core.MetricsRegistry;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import kafka.log.LogConfig;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeLogDirsResponse;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/LoadMonitorTest.class */
public class LoadMonitorTest {
    private static final int P1 = 1;
    private static final int NUM_WINDOWS = 2;
    private static final int MIN_SAMPLES_PER_WINDOW = 4;
    private static final long WINDOW_MS = 1000;
    private final Time _time = new MockTime(0);
    private MetricsRegistry metricsRegistry;
    private static final int P0 = 0;
    private static final TopicPartition T0P0 = new TopicPartition(TestConstants.TOPIC0, P0);
    private static final TopicPartition T0P1 = new TopicPartition(TestConstants.TOPIC0, 1);
    private static final TopicPartition T1P0 = new TopicPartition(TestConstants.TOPIC1, P0);
    private static final TopicPartition T1P1 = new TopicPartition(TestConstants.TOPIC1, 1);
    private static final PartitionEntity PE_T0P0 = new PartitionEntity(T0P0);
    private static final PartitionEntity PE_T0P1 = new PartitionEntity(T0P1);
    private static final PartitionEntity PE_T1P0 = new PartitionEntity(T1P0);
    private static final PartitionEntity PE_T1P1 = new PartitionEntity(T1P1);
    private static final BrokerEntity NODE_0 = new BrokerEntity("localhost", P0);
    private static final BrokerEntity NODE_1 = new BrokerEntity("localhost", 1);
    private static final MetricDef COMMON_METRIC_DEF = KafkaMetricDef.commonMetricDef();
    private static final MetricDef BROKER_METRIC_DEF = KafkaMetricDef.brokerMetricDef();
    private static final String DEFAULT_CLEANUP_POLICY = LogConfig.Delete();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/LoadMonitorTest$TestContext.class */
    public static class TestContext {
        private final LoadMonitor _loadMonitor;
        private final KafkaPartitionMetricSampleAggregator _aggregator;
        private final KafkaCruiseControlConfig _config;
        private final Cluster _cluster;

        private TestContext(LoadMonitor loadMonitor, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, KafkaCruiseControlConfig kafkaCruiseControlConfig, Cluster cluster) {
            this._loadMonitor = loadMonitor;
            this._aggregator = kafkaPartitionMetricSampleAggregator;
            this._config = kafkaCruiseControlConfig;
            this._cluster = cluster;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public LoadMonitor loadmonitor() {
            return this._loadMonitor;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public KafkaPartitionMetricSampleAggregator aggregator() {
            return this._aggregator;
        }

        private KafkaCruiseControlConfig config() {
            return this._config;
        }

        private Cluster cluster() {
            return this._cluster;
        }
    }

    @Before
    public void setUp() {
        this.metricsRegistry = new MetricsRegistry();
    }

    @After
    public void tearDown() {
        this.metricsRegistry.shutdown();
    }

    @Test
    public void testStateWithOnlyActiveSnapshotWindow() {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        LoadMonitorState state = loadmonitor.state(new OperationProgress(), loadmonitor.refreshClusterAndGeneration());
        Assert.assertEquals(0L, state.numValidPartitions());
        Assert.assertEquals(0L, state.numValidWindows());
        Assert.assertTrue(state.monitoredWindows().isEmpty());
    }

    @Test
    public void testStateWithoutEnoughSnapshotWindows() {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        LoadMonitorState state = loadmonitor.state(new OperationProgress(), loadmonitor.refreshClusterAndGeneration());
        Assert.assertEquals(0L, state.numValidPartitions());
        Assert.assertEquals(0L, state.numValidWindows());
        Assert.assertEquals(1L, state.monitoredWindows().size());
        Assert.assertEquals(0.5d, ((Float) state.monitoredWindows().get(Long.valueOf(WINDOW_MS))).floatValue(), 0.0d);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, 1, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        LoadMonitorState state2 = loadmonitor.state(new OperationProgress(), loadmonitor.refreshClusterAndGeneration());
        Assert.assertEquals(0L, state2.numValidPartitions());
        Assert.assertEquals(1L, state2.numValidWindows());
        Assert.assertEquals(1L, state2.monitoredWindows().size());
        Assert.assertEquals(1.0d, ((Float) state2.monitoredWindows().get(Long.valueOf(WINDOW_MS))).floatValue(), 0.0d);
    }

    @Test
    public void testStateWithInvalidSnapshotWindows() {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, NUM_WINDOWS, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, NUM_WINDOWS, aggregator, PE_T1P1, NUM_WINDOWS, WINDOW_MS, COMMON_METRIC_DEF);
        LoadMonitorState state = loadmonitor.state(new OperationProgress(), loadmonitor.refreshClusterAndGeneration());
        Assert.assertEquals(2L, state.numValidPartitions());
        Assert.assertEquals(1L, state.numValidWindows());
        Assert.assertEquals(2L, state.monitoredWindows().size());
        Assert.assertEquals(1.0d, ((Float) state.monitoredWindows().get(Long.valueOf(WINDOW_MS))).floatValue(), 0.0d);
        Assert.assertEquals(0.5d, ((Float) state.monitoredWindows().get(2000L)).floatValue(), 0.0d);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, 3, aggregator, PE_T1P1, 1, WINDOW_MS, COMMON_METRIC_DEF);
        LoadMonitorState state2 = loadmonitor.state(new OperationProgress(), loadmonitor.refreshClusterAndGeneration());
        Assert.assertEquals(4L, state2.numValidPartitions());
        Assert.assertEquals(2L, state2.numValidWindows());
        Assert.assertEquals(2L, state2.monitoredWindows().size());
        Assert.assertEquals(1.0d, ((Float) state2.monitoredWindows().get(Long.valueOf(WINDOW_MS))).floatValue(), 0.0d);
        Assert.assertEquals(1.0d, ((Float) state2.monitoredWindows().get(2000L)).floatValue(), 0.0d);
    }

    @Test
    public void testMeetCompletenessRequirements() {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        ModelCompletenessRequirements modelCompletenessRequirements = new ModelCompletenessRequirements(1, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements2 = new ModelCompletenessRequirements(1, 0.5d, false);
        ModelCompletenessRequirements modelCompletenessRequirements3 = new ModelCompletenessRequirements(NUM_WINDOWS, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements4 = new ModelCompletenessRequirements(NUM_WINDOWS, 0.5d, false);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        MetadataClient.ClusterAndGeneration refreshClusterAndGeneration = loadmonitor.refreshClusterAndGeneration();
        Assert.assertFalse(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration, modelCompletenessRequirements));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration, modelCompletenessRequirements2));
        Assert.assertFalse(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration, modelCompletenessRequirements3));
        Assert.assertFalse(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration, modelCompletenessRequirements4));
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, NUM_WINDOWS, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, NUM_WINDOWS, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, NUM_WINDOWS, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, 1, aggregator, PE_T1P1, NUM_WINDOWS, WINDOW_MS, COMMON_METRIC_DEF);
        MetadataClient.ClusterAndGeneration refreshClusterAndGeneration2 = loadmonitor.refreshClusterAndGeneration();
        Assert.assertFalse(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration2, modelCompletenessRequirements));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration2, modelCompletenessRequirements2));
        Assert.assertFalse(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration2, modelCompletenessRequirements3));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration2, modelCompletenessRequirements4));
        CruiseControlUnitTestUtils.populateSampleAggregator(1, 1, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        MetadataClient.ClusterAndGeneration refreshClusterAndGeneration3 = loadmonitor.refreshClusterAndGeneration();
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration3, modelCompletenessRequirements));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration3, modelCompletenessRequirements2));
        Assert.assertFalse(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration3, modelCompletenessRequirements3));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration3, modelCompletenessRequirements4));
        CruiseControlUnitTestUtils.populateSampleAggregator(1, 3, aggregator, PE_T1P1, 1, WINDOW_MS, COMMON_METRIC_DEF);
        MetadataClient.ClusterAndGeneration refreshClusterAndGeneration4 = loadmonitor.refreshClusterAndGeneration();
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration4, modelCompletenessRequirements));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration4, modelCompletenessRequirements2));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration4, modelCompletenessRequirements3));
        Assert.assertTrue(loadmonitor.meetCompletenessRequirements(refreshClusterAndGeneration4, modelCompletenessRequirements4));
    }

    @Test
    public void testBasicClusterModel() throws NotEnoughValidWindowsException {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        ClusterModel clusterModel = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, new ModelCompletenessRequirements(NUM_WINDOWS, 1.0d, false), new OperationProgress());
        Assert.assertEquals(6.5d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
    }

    @Test
    public void testJBODClusterModel() throws NotEnoughValidWindowsException {
        TestContext prepareContext = prepareContext(NUM_WINDOWS, true);
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        ClusterModel clusterModel = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, new ModelCompletenessRequirements(NUM_WINDOWS, 1.0d, false), true, new OperationProgress());
        Assert.assertEquals(4L, clusterModel.broker(P0).disk("/tmp/kafka-logs").replicas().size());
        Assert.assertEquals(3L, clusterModel.broker(1).disk("/tmp/kafka-logs-1").replicas().size());
        Assert.assertEquals(1L, clusterModel.broker(1).disk("/tmp/kafka-logs-2").replicas().size());
        Assert.assertEquals(6.5d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
    }

    @Test
    public void testClusterModelWithInvalidPartitionAndInsufficientSnapshotWindows() throws NotEnoughValidWindowsException {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        ModelCompletenessRequirements modelCompletenessRequirements = new ModelCompletenessRequirements(1, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements2 = new ModelCompletenessRequirements(1, 0.5d, false);
        ModelCompletenessRequirements modelCompletenessRequirements3 = new ModelCompletenessRequirements(NUM_WINDOWS, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements4 = new ModelCompletenessRequirements(NUM_WINDOWS, 0.5d, false);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        try {
            loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements, new OperationProgress());
            Assert.fail("Should have thrown NotEnoughValidWindowsException.");
        } catch (NotEnoughValidWindowsException e) {
        }
        ClusterModel clusterModel = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements2, new OperationProgress());
        Assert.assertNull(clusterModel.partition(T1P0));
        Assert.assertNull(clusterModel.partition(T1P1));
        Assert.assertEquals(1L, clusterModel.partition(T0P0).leader().load().numWindows());
        Assert.assertEquals(3.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(1.5d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(3.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(3.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
        try {
            loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements3, new OperationProgress());
            Assert.fail("Should have thrown NotEnoughValidWindowsException.");
        } catch (NotEnoughValidWindowsException e2) {
        }
        try {
            loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements4, new OperationProgress());
            Assert.fail("Should have thrown NotEnoughValidWindowsException.");
        } catch (NotEnoughValidWindowsException e3) {
        }
    }

    @Test
    public void testClusterWithInvalidPartitions() throws NotEnoughValidWindowsException {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        ModelCompletenessRequirements modelCompletenessRequirements = new ModelCompletenessRequirements(1, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements2 = new ModelCompletenessRequirements(1, 0.5d, false);
        ModelCompletenessRequirements modelCompletenessRequirements3 = new ModelCompletenessRequirements(NUM_WINDOWS, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements4 = new ModelCompletenessRequirements(NUM_WINDOWS, 0.5d, false);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        try {
            loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements, new OperationProgress());
            Assert.fail("Should have thrown NotEnoughValidWindowsException.");
        } catch (NotEnoughValidWindowsException e) {
        }
        ClusterModel clusterModel = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements2, new OperationProgress());
        Assert.assertNull(clusterModel.partition(T1P0));
        Assert.assertNull(clusterModel.partition(T1P1));
        Assert.assertEquals(2L, clusterModel.partition(T0P0).leader().load().numWindows());
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(6.5d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
        try {
            loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements3, new OperationProgress());
            Assert.fail("Should have thrown NotEnoughValidWindowsException.");
        } catch (NotEnoughValidWindowsException e2) {
        }
        ClusterModel clusterModel2 = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements4, new OperationProgress());
        Assert.assertNull(clusterModel2.partition(T1P0));
        Assert.assertNull(clusterModel2.partition(T1P1));
        Assert.assertEquals(2L, clusterModel2.partition(T0P0).leader().load().numWindows());
        Assert.assertEquals(13.0d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(6.5d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(13.0d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(13.0d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
    }

    @Test
    public void testClusterModelWithPartlyInvalidPartitions() throws NotEnoughValidWindowsException {
        TestContext prepareContext = prepareContext();
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        ModelCompletenessRequirements modelCompletenessRequirements = new ModelCompletenessRequirements(1, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements2 = new ModelCompletenessRequirements(1, 0.5d, false);
        ModelCompletenessRequirements modelCompletenessRequirements3 = new ModelCompletenessRequirements(NUM_WINDOWS, 1.0d, false);
        ModelCompletenessRequirements modelCompletenessRequirements4 = new ModelCompletenessRequirements(NUM_WINDOWS, 0.5d, false);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, 1, aggregator, PE_T1P1, 1, WINDOW_MS, COMMON_METRIC_DEF);
        ClusterModel clusterModel = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements, new OperationProgress());
        Iterator it = Arrays.asList(T0P0, T0P1, T1P0, T1P1).iterator();
        while (it.hasNext()) {
            Assert.assertNotNull(clusterModel.partition((TopicPartition) it.next()));
        }
        Assert.assertEquals(1L, clusterModel.partition(T0P0).leader().load().numWindows());
        Assert.assertEquals(13.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(11.5d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(23.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(23.0d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
        Assert.assertEquals(10.0d, clusterModel.partition(T1P1).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(10.0d, clusterModel.partition(T1P1).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(20.0d, clusterModel.partition(T1P1).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(20.0d, clusterModel.partition(T1P1).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
        ClusterModel clusterModel2 = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements2, new OperationProgress());
        Assert.assertNull(clusterModel2.partition(T1P0));
        Assert.assertNull(clusterModel2.partition(T1P1));
        Assert.assertEquals(2L, clusterModel2.partition(T0P0).leader().load().numWindows());
        Assert.assertEquals(13.0d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(6.5d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(13.0d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(13.0d, clusterModel2.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
        try {
            loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements3, new OperationProgress());
            Assert.fail("Should have thrown NotEnoughValidWindowsException.");
        } catch (NotEnoughValidWindowsException e) {
        }
        ClusterModel clusterModel3 = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, modelCompletenessRequirements4, new OperationProgress());
        Assert.assertNull(clusterModel3.partition(T1P0));
        Assert.assertNull(clusterModel3.partition(T1P1));
        Assert.assertEquals(2L, clusterModel3.partition(T0P0).leader().load().numWindows());
        Assert.assertEquals(13.0d, clusterModel3.partition(T0P0).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(6.5d, clusterModel3.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(13.0d, clusterModel3.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(13.0d, clusterModel3.partition(T0P0).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
    }

    @Test
    public void testClusterModelWithInvalidSnapshotWindows() throws NotEnoughValidWindowsException {
        TestContext prepareContext = prepareContext(MIN_SAMPLES_PER_WINDOW, false);
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, 3, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, 3, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, 3, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P1, 3, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, MIN_SAMPLES_PER_WINDOW, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, MIN_SAMPLES_PER_WINDOW, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, MIN_SAMPLES_PER_WINDOW, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P1, MIN_SAMPLES_PER_WINDOW, WINDOW_MS, COMMON_METRIC_DEF);
        ClusterModel clusterModel = loadmonitor.clusterModel(-1L, Long.MAX_VALUE, new ModelCompletenessRequirements(NUM_WINDOWS, 0.0d, false), new OperationProgress());
        Assert.assertEquals(2L, clusterModel.partition(T0P0).leader().load().numWindows());
        Assert.assertEquals(16.5d, clusterModel.partition(T0P0).leader().load().expectedUtilizationFor(Resource.CPU), 0.0d);
        Assert.assertEquals(2L, clusterModel.partition(T0P1).leader().load().numWindows());
        Assert.assertEquals(33.0d, clusterModel.partition(T0P1).leader().load().expectedUtilizationFor(Resource.DISK), 0.0d);
        Assert.assertEquals(2L, clusterModel.partition(T1P0).leader().load().numWindows());
        Assert.assertEquals(33.0d, clusterModel.partition(T1P0).leader().load().expectedUtilizationFor(Resource.NW_IN), 0.0d);
        Assert.assertEquals(2L, clusterModel.partition(T1P1).leader().load().numWindows());
        Assert.assertEquals(33.0d, clusterModel.partition(T1P1).leader().load().expectedUtilizationFor(Resource.NW_OUT), 0.0d);
    }

    @Test
    public void testComputeThrottle() throws Exception {
        TestContext prepareContext = prepareContext(MIN_SAMPLES_PER_WINDOW, false);
        LoadMonitor loadmonitor = prepareContext.loadmonitor();
        KafkaPartitionMetricSampleAggregator aggregator = prepareContext.aggregator();
        KafkaBrokerMetricSampleAggregator brokerSampleAggregator = loadmonitor.brokerSampleAggregator();
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T0P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P0, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, aggregator, PE_T1P1, P0, WINDOW_MS, COMMON_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, brokerSampleAggregator, NODE_0, P0, WINDOW_MS, BROKER_METRIC_DEF);
        CruiseControlUnitTestUtils.populateSampleAggregator(3, MIN_SAMPLES_PER_WINDOW, brokerSampleAggregator, NODE_1, P0, WINDOW_MS, BROKER_METRIC_DEF);
        loadmonitor.clusterModel(Long.MAX_VALUE, new ModelCompletenessRequirements(NUM_WINDOWS, 1.0d, false), new OperationProgress());
        Assert.assertEquals(65388544L, loadmonitor.computeThrottle());
    }

    private TestContext prepareContext() {
        return prepareContext(NUM_WINDOWS, false);
    }

    private TestContext prepareContext(int i, boolean z) {
        Cluster cluster = getCluster(Arrays.asList(T0P0, T0P1, T1P0, T1P1));
        MetadataClient.ClusterAndPlacements clusterAndPlacements = new MetadataClient.ClusterAndPlacements(cluster, Collections.emptyMap());
        MetadataClient metadataClient = (MetadataClient) EasyMock.mock(MetadataClient.class);
        EasyMock.expect(metadataClient.cluster()).andReturn(getCluster(Collections.emptyList())).once();
        EasyMock.expect(metadataClient.cluster()).andReturn(cluster).anyTimes();
        EasyMock.expect(metadataClient.clusterAndGeneration()).andReturn(new MetadataClient.ClusterAndGeneration(clusterAndPlacements, P0)).anyTimes();
        EasyMock.expect(metadataClient.refreshMetadata()).andReturn(new MetadataClient.ClusterAndGeneration(clusterAndPlacements, P0)).anyTimes();
        EasyMock.expect(metadataClient.refreshMetadata(EasyMock.anyInt())).andReturn(new MetadataClient.ClusterAndGeneration(clusterAndPlacements, P0)).anyTimes();
        EasyMock.replay(new Object[]{metadataClient});
        ConfluentAdmin confluentAdmin = (ConfluentAdmin) EasyMock.mock(ConfluentAdmin.class);
        EasyMock.expect(confluentAdmin.describeLogDirs(Arrays.asList(Integer.valueOf(P0), 1))).andReturn(getDescribeLogDirsResult()).anyTimes();
        EasyMock.expect(confluentAdmin.describeLogDirs(Arrays.asList(1, Integer.valueOf(P0)))).andReturn(getDescribeLogDirsResult()).anyTimes();
        EasyMock.replay(new Object[]{confluentAdmin});
        Properties kafkaCruiseControlProperties = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
        kafkaCruiseControlProperties.put("num.partition.metrics.windows", Integer.toString(i));
        kafkaCruiseControlProperties.put("min.samples.per.partition.metrics.window", Integer.toString(MIN_SAMPLES_PER_WINDOW));
        kafkaCruiseControlProperties.put("partition.metrics.window.ms", Long.toString(WINDOW_MS));
        kafkaCruiseControlProperties.put(LogConfig.CleanupPolicyProp(), DEFAULT_CLEANUP_POLICY);
        kafkaCruiseControlProperties.put("sample.store.class", NoopSampleStore.class.getName());
        kafkaCruiseControlProperties.put("zookeeper.security.enabled", "false");
        kafkaCruiseControlProperties.put("max.volume.throughput.mb", "156");
        kafkaCruiseControlProperties.put("write.throughput.multiplier", "1.0");
        kafkaCruiseControlProperties.put("read.throughput.multiplier", "1.0");
        kafkaCruiseControlProperties.put("calculated.throttle.ratio", "0.4");
        kafkaCruiseControlProperties.put("test.disk.capacity", Double.toString(4194304.0d));
        kafkaCruiseControlProperties.put("test.cpu.capacity", "100.0");
        kafkaCruiseControlProperties.put("test.nwin.capacity", Double.toString(159744.0d));
        kafkaCruiseControlProperties.put("test.nwout.capacity", Double.toString(159744.0d));
        kafkaCruiseControlProperties.put("estimation.info", "testInfo");
        if (z) {
            kafkaCruiseControlProperties.put("broker.capacity.config.resolver.class", BrokerCapacityConfigFileResolver.class);
            kafkaCruiseControlProperties.setProperty("capacity.config.file", KafkaCruiseControlUnitTestUtils.class.getClassLoader().getResource(TestConstants.JBOD_BROKER_CAPACITY_CONFIG_FILE).getFile());
        } else {
            kafkaCruiseControlProperties.put("broker.capacity.config.resolver.class", "com.linkedin.kafka.cruisecontrol.common.TestBrokerCapacityConfigResolver");
        }
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(kafkaCruiseControlProperties);
        LoadMonitor loadMonitor = new LoadMonitor(kafkaCruiseControlConfig, metadataClient, confluentAdmin, this._time, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), COMMON_METRIC_DEF);
        KafkaPartitionMetricSampleAggregator partitionSampleAggregator = loadMonitor.partitionSampleAggregator();
        Assert.assertFalse(partitionSampleAggregator.isValidLeader(new PartitionMetricSample(P0, T0P0)));
        Assert.assertTrue(partitionSampleAggregator.isValidLeader(new PartitionMetricSample(P0, T0P0)));
        ModelParameters.init(kafkaCruiseControlConfig);
        loadMonitor.startUp();
        MetadataClient.ClusterAndGeneration refreshClusterAndGeneration = loadMonitor.refreshClusterAndGeneration();
        while (loadMonitor.state(new OperationProgress(), refreshClusterAndGeneration).state() != LoadMonitorTaskRunner.LoadMonitorTaskRunnerState.RUNNING) {
            try {
                Thread.sleep(1L);
            } catch (InterruptedException e) {
            }
        }
        return new TestContext(loadMonitor, partitionSampleAggregator, kafkaCruiseControlConfig, cluster);
    }

    private Cluster getCluster(Collection<TopicPartition> collection) {
        Node node = new Node(P0, "localhost", 100, "rack0");
        Node node2 = new Node(1, "localhost", 100, "rack1");
        Node[] nodeArr = {node, node2};
        HashSet hashSet = new HashSet(NUM_WINDOWS);
        hashSet.add(node);
        hashSet.add(node2);
        HashSet hashSet2 = new HashSet(collection.size());
        for (TopicPartition topicPartition : collection) {
            hashSet2.add(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), node, nodeArr, nodeArr));
        }
        return new Cluster("cluster-id", hashSet, hashSet2, Collections.emptySet(), Collections.emptySet());
    }

    private DescribeLogDirsResult getDescribeLogDirsResult() {
        try {
            Constructor declaredConstructor = DescribeLogDirsResult.class.getDeclaredConstructor(Map.class);
            declaredConstructor.setAccessible(true);
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            HashMap hashMap3 = new HashMap();
            hashMap3.put(T0P0, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false));
            hashMap3.put(T0P1, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false));
            hashMap3.put(T1P0, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false));
            hashMap3.put(T1P1, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false));
            hashMap2.put("/tmp/kafka-logs", new DescribeLogDirsResponse.LogDirInfo(Errors.NONE, hashMap3));
            hashMap.put(Integer.valueOf(P0), KafkaFuture.completedFuture(hashMap2));
            HashMap hashMap4 = new HashMap();
            HashMap hashMap5 = new HashMap();
            hashMap5.put(T0P0, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false));
            hashMap5.put(T0P1, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false));
            hashMap5.put(T1P0, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false));
            hashMap4.put("/tmp/kafka-logs-1", new DescribeLogDirsResponse.LogDirInfo(Errors.NONE, hashMap5));
            hashMap4.put("/tmp/kafka-logs-2", new DescribeLogDirsResponse.LogDirInfo(Errors.NONE, Collections.singletonMap(T1P1, new DescribeLogDirsResponse.ReplicaInfo(0L, 0L, false))));
            hashMap.put(1, KafkaFuture.completedFuture(hashMap4));
            return (DescribeLogDirsResult) declaredConstructor.newInstance(hashMap);
        } catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            return null;
        }
    }
}
