package com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator;

import com.linkedin.cruisecontrol.CruiseControlUnitTestUtils;
import com.linkedin.cruisecontrol.exception.NotEnoughValidWindowsException;
import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.Extrapolation;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.MetricSampleAggregationResult;
import com.linkedin.cruisecontrol.monitor.sampling.aggregator.ValuesAndExtrapolations;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.async.progress.OperationProgress;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.common.Resource;
import com.linkedin.kafka.cruisecontrol.common.TestConstants;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.model.LinearRegressionModelParameters;
import com.linkedin.kafka.cruisecontrol.monitor.ModelCompletenessRequirements;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionEntity;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.SortedMap;
import java.util.SortedSet;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/aggregator/KafkaPartitionMetricSampleAggregatorTest.class */
public class KafkaPartitionMetricSampleAggregatorTest {
    private static final int NUM_WINDOWS = 20;
    private static final long WINDOW_MS = 1000;
    private static final int MIN_SAMPLES_PER_WINDOW = 4;
    private static final int PARTITION = 0;
    private static final TopicPartition TP = new TopicPartition(TestConstants.TOPIC0, PARTITION);
    private static final PartitionEntity PE = new PartitionEntity(TP);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/sampling/aggregator/KafkaPartitionMetricSampleAggregatorTest$TestContext.class */
    public static class TestContext {
        private final Cluster cluster;
        private final KafkaPartitionMetricSampleAggregator aggregator;

        TestContext(Cluster cluster, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator) {
            this.cluster = cluster;
            this.aggregator = kafkaPartitionMetricSampleAggregator;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public MetadataClient.ClusterAndGeneration clusterAndGeneration(int i) {
            return new MetadataClient.ClusterAndGeneration(new MetadataClient.ClusterMetadata(this.cluster, Collections.emptyMap(), Collections.emptyMap()), i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public KafkaPartitionMetricSampleAggregator aggregator() {
            return this.aggregator;
        }
    }

    @Test
    public void testAggregate() throws NotEnoughValidWindowsException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(Collections.singleton(TP));
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator);
        MetricSampleAggregationResult aggregate = aggregator.aggregate(clusterAndGeneration(cluster), Long.MAX_VALUE, new OperationProgress());
        Map valuesAndExtrapolations = aggregate.valuesAndExtrapolations();
        Assertions.assertEquals(1, valuesAndExtrapolations.size(), "The windows should only have one partition");
        ValuesAndExtrapolations valuesAndExtrapolations2 = (ValuesAndExtrapolations) valuesAndExtrapolations.get(PE);
        Assertions.assertNotNull(valuesAndExtrapolations2);
        Assertions.assertEquals(NUM_WINDOWS, valuesAndExtrapolations2.metricValues().length());
        for (int i = PARTITION; i < NUM_WINDOWS; i++) {
            Assertions.assertEquals((NUM_WINDOWS - i) * WINDOW_MS, ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(PE)).window(i));
            for (Resource resource : Resource.cachedValues()) {
                double size = ((resource == Resource.DISK ? (((19 - i) * 10) + MIN_SAMPLES_PER_WINDOW) - 1 : ((19 - i) * 10) + 1.5d) / (resource == Resource.CPU ? 100.0d : 1.0d)) * KafkaMetricDef.resourceToMetricIds(resource).size();
                Assertions.assertEquals(size, valuesAndExtrapolations2.metricValues().valuesForGroup(resource.name(), KafkaMetricDef.commonMetricDef(), true).get(i), 0.01d, "The utilization for " + resource + " should be " + size);
            }
        }
        MetadataClient.ClusterAndGeneration clusterAndGeneration = new MetadataClient.ClusterAndGeneration(new MetadataClient.ClusterMetadata(cluster, Collections.emptyMap(), Collections.emptyMap()), 1);
        Assertions.assertEquals(NUM_WINDOWS, aggregator.validWindows(clusterAndGeneration, 1.0d).size());
        Iterator it = aggregator.validPartitionRatioByWindows(clusterAndGeneration).values().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(1.0d, ((Float) it.next()).floatValue(), 0.0d);
        }
        Assertions.assertEquals(NUM_WINDOWS, aggregator.availableWindows().size());
    }

    @Test
    public void testAggregateWithUpdatedCluster() throws NotEnoughValidWindowsException {
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(new KafkaCruiseControlConfig(getLoadMonitorProperties()), getCluster(Arrays.asList(TP)));
        populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator);
        TopicPartition topicPartition = new TopicPartition("topic01", PARTITION);
        Cluster cluster = getCluster(Arrays.asList(TP, topicPartition));
        Map valuesAndExtrapolations = aggregator.aggregate(clusterAndGeneration(cluster), Long.MAX_VALUE, new OperationProgress()).valuesAndExtrapolations();
        Assertions.assertEquals(1, valuesAndExtrapolations.size());
        Assertions.assertEquals(NUM_WINDOWS, ((ValuesAndExtrapolations) valuesAndExtrapolations.get(PE)).windows().size());
        Map valuesAndExtrapolations2 = aggregator.aggregate(clusterAndGeneration(cluster), Long.MAX_VALUE, new ModelCompletenessRequirements(1, 0.0d, true), new OperationProgress()).valuesAndExtrapolations();
        Assertions.assertNotNull(valuesAndExtrapolations2.get(new PartitionEntity(topicPartition)), "tp1 should be included because includeAllTopics is set to true");
        Map extrapolations = ((ValuesAndExtrapolations) valuesAndExtrapolations2.get(new PartitionEntity(topicPartition))).extrapolations();
        Assertions.assertEquals(NUM_WINDOWS, extrapolations.size());
        for (int i = PARTITION; i < NUM_WINDOWS; i++) {
            Assertions.assertEquals(Extrapolation.NO_VALID_EXTRAPOLATION, extrapolations.get(Integer.valueOf(i)));
        }
    }

    @Test
    public void testAggregateWithPartitionExtrapolations() throws NotEnoughValidWindowsException {
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(new KafkaCruiseControlConfig(getLoadMonitorProperties()), getCluster(Collections.singleton(TP)));
        TopicPartition topicPartition = new TopicPartition(TestConstants.TOPIC0, 1);
        Cluster cluster = getCluster(Arrays.asList(TP, topicPartition));
        PartitionEntity partitionEntity = new PartitionEntity(topicPartition);
        populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator);
        CruiseControlUnitTestUtils.populateSampleAggregator(18, MIN_SAMPLES_PER_WINDOW, aggregator, partitionEntity, PARTITION, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        CruiseControlUnitTestUtils.populateSampleAggregator(2, MIN_SAMPLES_PER_WINDOW, aggregator, partitionEntity, 19, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        MetricSampleAggregationResult aggregate = aggregator.aggregate(clusterAndGeneration(cluster), Long.MAX_VALUE, new OperationProgress());
        Assertions.assertEquals(2, aggregate.valuesAndExtrapolations().size());
        Assertions.assertTrue(((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(PE)).extrapolations().isEmpty());
        Assertions.assertEquals(1, ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(partitionEntity)).extrapolations().size());
        Assertions.assertTrue(((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(partitionEntity)).extrapolations().containsKey(1));
        Assertions.assertEquals(19000L, ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(partitionEntity)).window(1));
        Assertions.assertEquals(Extrapolation.AVG_ADJACENT, ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(partitionEntity)).extrapolations().get(1));
    }

    @Test
    public void testFallbackToAvgAvailable() throws NotEnoughValidWindowsException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(Collections.singleton(TP));
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        CruiseControlUnitTestUtils.populateSampleAggregator(19, MIN_SAMPLES_PER_WINDOW, aggregator, PE, 2, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        Assertions.assertEquals(18, ((ValuesAndExtrapolations) aggregator.aggregate(clusterAndGeneration(cluster), 20000L, new OperationProgress()).valuesAndExtrapolations().get(PE)).windows().size());
        populateSampleAggregator(2, 2, aggregator);
        MetricSampleAggregationResult aggregate = aggregator.aggregate(clusterAndGeneration(cluster), 20000L, new OperationProgress());
        Assertions.assertEquals(NUM_WINDOWS, ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(PE)).metricValues().length());
        int i = PARTITION;
        Iterator it = ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(PE)).extrapolations().entrySet().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(Extrapolation.AVG_AVAILABLE, ((Map.Entry) it.next()).getValue());
            i++;
        }
        Assertions.assertEquals(2, i);
    }

    @Test
    public void testFallbackToAvgAdjacent() throws NotEnoughValidWindowsException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        TopicPartition topicPartition = new TopicPartition("AnotherTopic", 1);
        PartitionEntity partitionEntity = new PartitionEntity(topicPartition);
        Cluster cluster = getCluster(Arrays.asList(TP, topicPartition));
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        populateSampleAggregator(NUM_WINDOWS, MIN_SAMPLES_PER_WINDOW, aggregator);
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, PE, NUM_WINDOWS, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        CruiseControlUnitTestUtils.populateSampleAggregator(1, MIN_SAMPLES_PER_WINDOW, aggregator, partitionEntity, 21, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        CruiseControlUnitTestUtils.populateSampleAggregator(2, MIN_SAMPLES_PER_WINDOW, aggregator, PE, 22, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        MetricSampleAggregationResult aggregate = aggregator.aggregate(clusterAndGeneration(cluster), 40000L, new OperationProgress());
        Assertions.assertEquals(NUM_WINDOWS, ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(PE)).metricValues().length());
        int i = PARTITION;
        Iterator it = ((ValuesAndExtrapolations) aggregate.valuesAndExtrapolations().get(PE)).extrapolations().entrySet().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(Extrapolation.AVG_ADJACENT, ((Map.Entry) it.next()).getValue());
            i++;
        }
        Assertions.assertEquals(1, i);
    }

    @Test
    public void testTooManyFlaws() throws NotEnoughValidWindowsException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(Collections.singleton(TP));
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        CruiseControlUnitTestUtils.populateSampleAggregator(18, MIN_SAMPLES_PER_WINDOW, aggregator, PE, 3, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        Assertions.assertEquals(17, ((ValuesAndExtrapolations) aggregator.aggregate(clusterAndGeneration(cluster), 20000L, new OperationProgress()).valuesAndExtrapolations().get(PE)).windows().size());
    }

    @Test
    public void testNotEnoughWindows() {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(Collections.singleton(TP));
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator);
        try {
            aggregator.aggregate(clusterAndGeneration(cluster), 18999L, new ModelCompletenessRequirements(NUM_WINDOWS, 0.0d, false), new OperationProgress());
            Assertions.fail("Should throw NotEnoughValidWindowsException");
        } catch (NotEnoughValidWindowsException e) {
        }
    }

    @Test
    public void testExcludeInvalidMetricSample() throws NotEnoughValidWindowsException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(Collections.singleton(TP));
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        MetricDef commonMetricDef = KafkaMetricDef.commonMetricDef();
        populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator);
        PartitionMetricSample partitionMetricSample = new PartitionMetricSample(1, TP);
        partitionMetricSample.record(commonMetricDef.metricInfo(KafkaMetricDef.DISK_USAGE.name()), 10000.0d);
        partitionMetricSample.record(commonMetricDef.metricInfo(KafkaMetricDef.CPU_USAGE.name()), 10000.0d);
        partitionMetricSample.record(commonMetricDef.metricInfo(KafkaMetricDef.LEADER_BYTES_IN.name()), 10000.0d);
        partitionMetricSample.record(commonMetricDef.metricInfo(LinearRegressionModelParameters.ModelCoefficient.LEADER_BYTES_OUT.name()), 10000.0d);
        partitionMetricSample.close(0L);
        PartitionMetricSample partitionMetricSample2 = new PartitionMetricSample(PARTITION, TP);
        partitionMetricSample2.record(commonMetricDef.metricInfo(KafkaMetricDef.CPU_USAGE.name()), 10000.0d);
        partitionMetricSample2.close(0L);
        aggregator.addSample(partitionMetricSample);
        aggregator.addSample(partitionMetricSample2);
        ValuesAndExtrapolations valuesAndExtrapolations = (ValuesAndExtrapolations) aggregator.aggregate(clusterAndGeneration(cluster), 20000L, new OperationProgress()).valuesAndExtrapolations().get(PE);
        for (Resource resource : Resource.cachedValues()) {
            double size = ((resource == Resource.DISK ? 3.0d : 1.5d) / (resource == Resource.CPU ? 100.0d : 1.0d)) * KafkaMetricDef.resourceToMetricIds(resource).size();
            Assertions.assertEquals(size, valuesAndExtrapolations.metricValues().valuesForGroup(resource.name(), KafkaMetricDef.commonMetricDef(), true).get(19), 0.01d, "The utilization for " + resource + " should be " + size);
        }
    }

    @Test
    public void testValidWindows() {
        TestContext testContext = setupScenario1();
        SortedSet<Long> validWindows = testContext.aggregator().validWindows(testContext.clusterAndGeneration(PARTITION), 1.0d);
        Assertions.assertEquals(NUM_WINDOWS, validWindows.size());
        assertValidWindows(validWindows, NUM_WINDOWS, Collections.emptySet());
    }

    @Test
    public void testValidWindowsWithInvalidPartitions() {
        TestContext testContext = setupScenario2();
        KafkaPartitionMetricSampleAggregator aggregator = testContext.aggregator();
        MetadataClient.ClusterAndGeneration clusterAndGeneration = testContext.clusterAndGeneration(PARTITION);
        SortedSet<Long> validWindows = aggregator.validWindows(clusterAndGeneration, 1.0d);
        Assertions.assertEquals(17, validWindows.size(), "Should have three invalid windows.");
        assertValidWindows(validWindows, 19, Arrays.asList(6, 7));
        Assertions.assertEquals(NUM_WINDOWS, aggregator.validWindows(clusterAndGeneration, 0.5d).size());
    }

    @Test
    public void testValidWindowWithDifferentInvalidPartitions() {
        TestContext testContext = setupScenario3();
        SortedSet<Long> validWindows = testContext.aggregator().validWindows(testContext.clusterAndGeneration(PARTITION), 0.5d);
        Assertions.assertEquals(18, validWindows.size(), "Should have two invalid windows.");
        assertValidWindows(validWindows, NUM_WINDOWS, Arrays.asList(6, 7));
    }

    @Test
    public void testValidWindowsWithTooManyExtrapolations() {
        TestContext testContext = setupScenario4();
        SortedSet<Long> validWindows = testContext.aggregator().validWindows(testContext.clusterAndGeneration(PARTITION), 0.5d);
        Assertions.assertEquals(18, validWindows.size(), "Should have two invalid windows.");
        assertValidWindows(validWindows, NUM_WINDOWS, Arrays.asList(6, 7));
    }

    @Test
    public void testMonitoredPercentage() {
        TestContext testContext = setupScenario1();
        Assertions.assertEquals(1.0d, testContext.aggregator().monitoredPercentage(testContext.clusterAndGeneration(PARTITION)), 0.01d);
        TestContext testContext2 = setupScenario2();
        Assertions.assertEquals(0.5d, testContext2.aggregator().monitoredPercentage(testContext2.clusterAndGeneration(PARTITION)), 0.01d);
        TestContext testContext3 = setupScenario3();
        Assertions.assertEquals(0.3333333333333333d, testContext3.aggregator().monitoredPercentage(testContext3.clusterAndGeneration(PARTITION)), 0.01d);
        TestContext testContext4 = setupScenario4();
        Assertions.assertEquals(0.3333333333333333d, testContext4.aggregator().monitoredPercentage(testContext4.clusterAndGeneration(PARTITION)), 0.01d);
    }

    @Test
    public void testMonitoredPercentagesByWindows() {
        TestContext testContext = setupScenario1();
        SortedMap validPartitionRatioByWindows = testContext.aggregator().validPartitionRatioByWindows(testContext.clusterAndGeneration(PARTITION));
        Assertions.assertEquals(NUM_WINDOWS, validPartitionRatioByWindows.size());
        Iterator it = validPartitionRatioByWindows.entrySet().iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(1.0d, ((Float) ((Map.Entry) it.next()).getValue()).floatValue(), 0.01d);
        }
        TestContext testContext2 = setupScenario2();
        SortedMap validPartitionRatioByWindows2 = testContext2.aggregator().validPartitionRatioByWindows(testContext2.clusterAndGeneration(PARTITION));
        Assertions.assertEquals(NUM_WINDOWS, validPartitionRatioByWindows2.size());
        Iterator it2 = validPartitionRatioByWindows2.entrySet().iterator();
        while (it2.hasNext()) {
            long longValue = ((Long) ((Map.Entry) it2.next()).getKey()).longValue();
            if (longValue == 6000 || longValue == 7000 || longValue == 20000) {
                Assertions.assertEquals(0.5d, ((Float) r0.getValue()).floatValue(), 0.01d);
            } else {
                Assertions.assertEquals(1.0d, ((Float) r0.getValue()).floatValue(), 0.01d);
            }
        }
        TestContext testContext3 = setupScenario3();
        SortedMap validPartitionRatioByWindows3 = testContext3.aggregator().validPartitionRatioByWindows(testContext3.clusterAndGeneration(PARTITION));
        Assertions.assertEquals(NUM_WINDOWS, validPartitionRatioByWindows3.size());
        Iterator it3 = validPartitionRatioByWindows3.entrySet().iterator();
        while (it3.hasNext()) {
            long longValue2 = ((Long) ((Map.Entry) it3.next()).getKey()).longValue();
            if (longValue2 == 6000 || longValue2 == 7000 || longValue2 == 18000 || longValue2 == 19000) {
                Assertions.assertEquals(0.6666666666666666d, ((Float) r0.getValue()).floatValue(), 0.01d);
            } else {
                Assertions.assertEquals(1.0d, ((Float) r0.getValue()).floatValue(), 0.01d);
            }
        }
        TestContext testContext4 = setupScenario4();
        SortedMap validPartitionRatioByWindows4 = testContext4.aggregator().validPartitionRatioByWindows(testContext4.clusterAndGeneration(PARTITION));
        Assertions.assertEquals(NUM_WINDOWS, validPartitionRatioByWindows4.size());
        Iterator it4 = validPartitionRatioByWindows4.entrySet().iterator();
        while (it4.hasNext()) {
            long longValue3 = ((Long) ((Map.Entry) it4.next()).getKey()).longValue();
            if (longValue3 == 6000 || longValue3 == 7000) {
                Assertions.assertEquals(0.3333333333333333d, ((Float) r0.getValue()).floatValue(), 0.01d);
            } else {
                Assertions.assertEquals(0.6666666666666666d, ((Float) r0.getValue()).floatValue(), 0.01d);
            }
        }
    }

    private TestContext setupScenario1() {
        List asList = Arrays.asList(TP, new TopicPartition(TestConstants.TOPIC0, 1), new TopicPartition("TOPIC1", PARTITION), new TopicPartition("TOPIC1", 1));
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(asList);
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        Iterator it = asList.iterator();
        while (it.hasNext()) {
            populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator, (TopicPartition) it.next());
        }
        return new TestContext(cluster, aggregator);
    }

    private TestContext setupScenario2() {
        TopicPartition topicPartition = new TopicPartition(TestConstants.TOPIC0, 1);
        TopicPartition topicPartition2 = new TopicPartition("TOPIC1", PARTITION);
        TopicPartition topicPartition3 = new TopicPartition("TOPIC1", 1);
        List asList = Arrays.asList(TP, topicPartition, topicPartition2, topicPartition3);
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(asList);
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        Iterator it = Arrays.asList(TP, topicPartition, topicPartition2).iterator();
        while (it.hasNext()) {
            populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator, (TopicPartition) it.next());
        }
        populateSampleAggregator(5, MIN_SAMPLES_PER_WINDOW, aggregator, topicPartition3);
        CruiseControlUnitTestUtils.populateSampleAggregator(12, MIN_SAMPLES_PER_WINDOW, aggregator, new PartitionEntity(topicPartition3), 7, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        return new TestContext(cluster, aggregator);
    }

    private TestContext setupScenario3() {
        TopicPartition topicPartition = new TopicPartition(TestConstants.TOPIC0, 1);
        TopicPartition topicPartition2 = new TopicPartition("TOPIC1", PARTITION);
        TopicPartition topicPartition3 = new TopicPartition("TOPIC1", 1);
        TopicPartition topicPartition4 = new TopicPartition("TOPIC2", PARTITION);
        TopicPartition topicPartition5 = new TopicPartition("TOPIC2", 1);
        List asList = Arrays.asList(TP, topicPartition, topicPartition2, topicPartition3, topicPartition4, topicPartition5);
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        Cluster cluster = getCluster(asList);
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        Iterator it = Arrays.asList(TP, topicPartition2, topicPartition4, topicPartition5).iterator();
        while (it.hasNext()) {
            populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator, (TopicPartition) it.next());
        }
        populateSampleAggregator(17, MIN_SAMPLES_PER_WINDOW, aggregator, topicPartition);
        CruiseControlUnitTestUtils.populateSampleAggregator(2, MIN_SAMPLES_PER_WINDOW, aggregator, new PartitionEntity(topicPartition), 19, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        populateSampleAggregator(5, MIN_SAMPLES_PER_WINDOW, aggregator, topicPartition3);
        CruiseControlUnitTestUtils.populateSampleAggregator(14, MIN_SAMPLES_PER_WINDOW, aggregator, new PartitionEntity(topicPartition3), 7, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        return new TestContext(cluster, aggregator);
    }

    private TestContext setupScenario4() {
        TopicPartition topicPartition = new TopicPartition(TestConstants.TOPIC0, 1);
        TopicPartition topicPartition2 = new TopicPartition("TOPIC1", PARTITION);
        TopicPartition topicPartition3 = new TopicPartition("TOPIC1", 1);
        TopicPartition topicPartition4 = new TopicPartition("TOPIC2", PARTITION);
        TopicPartition topicPartition5 = new TopicPartition("TOPIC2", 1);
        List asList = Arrays.asList(TP, topicPartition, topicPartition2, topicPartition3, topicPartition4, topicPartition5);
        Properties loadMonitorProperties = getLoadMonitorProperties();
        loadMonitorProperties.setProperty("max.allowed.extrapolations.per.partition", "0");
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(loadMonitorProperties);
        Cluster cluster = getCluster(asList);
        KafkaPartitionMetricSampleAggregator aggregator = aggregator(kafkaCruiseControlConfig, cluster);
        Iterator it = Arrays.asList(TP, topicPartition2, topicPartition4, topicPartition5).iterator();
        while (it.hasNext()) {
            populateSampleAggregator(21, MIN_SAMPLES_PER_WINDOW, aggregator, (TopicPartition) it.next());
        }
        populateSampleAggregator(21, 3, aggregator, topicPartition);
        populateSampleAggregator(5, MIN_SAMPLES_PER_WINDOW, aggregator, topicPartition3);
        CruiseControlUnitTestUtils.populateSampleAggregator(14, MIN_SAMPLES_PER_WINDOW, aggregator, new PartitionEntity(topicPartition3), 7, WINDOW_MS, KafkaMetricDef.commonMetricDef());
        return new TestContext(cluster, aggregator);
    }

    private void assertValidWindows(SortedSet<Long> sortedSet, int i, Collection<Integer> collection) {
        int i2 = i;
        Iterator<Long> it = sortedSet.iterator();
        while (it.hasNext()) {
            long longValue = it.next().longValue();
            while (collection.contains(Integer.valueOf(i2))) {
                i2--;
            }
            Assertions.assertEquals(i2 * WINDOW_MS, longValue);
            i2--;
        }
    }

    private MetadataClient.ClusterAndGeneration clusterAndGeneration(Cluster cluster) {
        return new MetadataClient.ClusterAndGeneration(new MetadataClient.ClusterMetadata(cluster, Collections.emptyMap(), Collections.emptyMap()), PARTITION);
    }

    private void populateSampleAggregator(int i, int i2, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator) {
        populateSampleAggregator(i, i2, kafkaPartitionMetricSampleAggregator, TP);
    }

    private void populateSampleAggregator(int i, int i2, KafkaPartitionMetricSampleAggregator kafkaPartitionMetricSampleAggregator, TopicPartition topicPartition) {
        CruiseControlUnitTestUtils.populateSampleAggregator(i, i2, kafkaPartitionMetricSampleAggregator, new PartitionEntity(topicPartition), PARTITION, WINDOW_MS, KafkaMetricDef.commonMetricDef());
    }

    private Properties getLoadMonitorProperties() {
        Properties kafkaCruiseControlProperties = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
        kafkaCruiseControlProperties.setProperty("partition.metrics.window.ms", Long.toString(WINDOW_MS));
        kafkaCruiseControlProperties.setProperty("num.partition.metrics.windows", Integer.toString(NUM_WINDOWS));
        kafkaCruiseControlProperties.setProperty("min.samples.per.partition.metrics.window", Integer.toString(MIN_SAMPLES_PER_WINDOW));
        return kafkaCruiseControlProperties;
    }

    private Cluster getCluster(Collection<TopicPartition> collection) {
        Node node = new Node(PARTITION, "localhost", 100, "rack0");
        Node node2 = new Node(1, "localhost", 100, "rack1");
        Node[] nodeArr = {node, node2};
        HashSet hashSet = new HashSet(2);
        hashSet.add(node);
        hashSet.add(node2);
        HashSet hashSet2 = new HashSet(collection.size());
        for (TopicPartition topicPartition : collection) {
            hashSet2.add(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), node, nodeArr, nodeArr));
        }
        return new Cluster("cluster_id", hashSet, hashSet2, Collections.emptySet(), Collections.emptySet());
    }

    private KafkaPartitionMetricSampleAggregator aggregator(KafkaCruiseControlConfig kafkaCruiseControlConfig, Cluster cluster) {
        MetadataClient metadataClient = (MetadataClient) EasyMock.mock(MetadataClient.class);
        EasyMock.expect(metadataClient.cluster()).andReturn(cluster).anyTimes();
        EasyMock.replay(new Object[]{metadataClient});
        return new KafkaPartitionMetricSampleAggregator(kafkaCruiseControlConfig, metadataClient);
    }
}
