package com.linkedin.kafka.cruisecontrol.monitor.task;

import com.linkedin.cruisecontrol.metricdef.MetricDef;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUnitTestUtils;
import com.linkedin.kafka.cruisecontrol.KafkaCruiseControlUtils;
import com.linkedin.kafka.cruisecontrol.common.MetadataClient;
import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityConfigResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness;
import com.linkedin.kafka.cruisecontrol.monitor.MockSampler;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricFetcherManager;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.NoopSampleStore;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaBrokerMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.aggregator.KafkaPartitionMetricSampleAggregator;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.holder.PartitionMetricSample;
import com.yammer.metrics.core.MetricsRegistry;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import kafka.admin.RackAwareMode$Safe$;
import kafka.zk.AdminZkClient;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import scala.Option$;

/* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/task/LoadMonitorTaskRunnerTest.class */
public class LoadMonitorTaskRunnerTest extends CCKafkaIntegrationTestHarness {
    private static final long WINDOW_MS = 10000;
    private static final int NUM_WINDOWS = 5;
    private static final int NUM_TOPICS = 100;
    private static final int NUM_PARTITIONS = 4;
    private static final String TEST_TOPIC_NAME = "LoadMonitorTaskRunnerTest";
    private static final long SAMPLING_INTERVAL = KafkaCruiseControlConfig.DEFAULT_METRIC_SAMPLING_INTERVAL_MS.longValue();
    private static final MetricDef METRIC_DEF = KafkaMetricDef.commonMetricDef();
    private static final Time TIME = new MockTime(1);

    @Rule
    public final Timeout globalTimeout = Timeout.millis(120000);
    private MetricsRegistry metricsRegistry;

    /* loaded from: input_file:com/linkedin/kafka/cruisecontrol/monitor/task/LoadMonitorTaskRunnerTest$MockPartitionMetricSampleAggregator.class */
    private static class MockPartitionMetricSampleAggregator extends KafkaPartitionMetricSampleAggregator {
        private final BlockingQueue<PartitionMetricSample> _partitionMetricSamples;

        MockPartitionMetricSampleAggregator(KafkaCruiseControlConfig kafkaCruiseControlConfig, MetadataClient metadataClient) {
            super(kafkaCruiseControlConfig, metadataClient);
            this._partitionMetricSamples = new ArrayBlockingQueue(10000);
        }

        public synchronized boolean addSample(PartitionMetricSample partitionMetricSample) {
            this._partitionMetricSamples.add(partitionMetricSample);
            return true;
        }

        public synchronized boolean addSample(PartitionMetricSample partitionMetricSample, boolean z) {
            this._partitionMetricSamples.add(partitionMetricSample);
            return true;
        }

        public BlockingQueue<PartitionMetricSample> metricSampleQueue() {
            return this._partitionMetricSamples;
        }
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @Before
    public void setUp() {
        super.setUp();
        KafkaZkClient createKafkaZkClient = KafkaCruiseControlUtils.createKafkaZkClient(zookeeper().connectionString(), "LoadMonitorTaskRunnerGroup", "LoadMonitorTaskRunnerSetup", false);
        AdminZkClient adminZkClient = new AdminZkClient(createKafkaZkClient);
        for (int i = 0; i < NUM_TOPICS; i++) {
            adminZkClient.createTopic("LoadMonitorTaskRunnerTest-" + i, NUM_PARTITIONS, 1, new Properties(), RackAwareMode$Safe$.MODULE$, false, Option$.MODULE$.empty());
        }
        KafkaCruiseControlUtils.closeKafkaZkClientWithTimeout(createKafkaZkClient);
        this.metricsRegistry = new MetricsRegistry();
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness, com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCAbstractZookeeperTestHarness
    @After
    public void tearDown() {
        super.tearDown();
        this.metricsRegistry.shutdown();
    }

    @Override // com.linkedin.kafka.cruisecontrol.metricsreporter.utils.CCKafkaIntegrationTestHarness
    public int clusterSize() {
        return 1;
    }

    @Test
    public void testSimpleFetch() throws InterruptedException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, TIME);
        MockPartitionMetricSampleAggregator mockPartitionMetricSampleAggregator = new MockPartitionMetricSampleAggregator(kafkaCruiseControlConfig, metadataClient);
        KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator = (KafkaBrokerMetricSampleAggregator) EasyMock.mock(KafkaBrokerMetricSampleAggregator.class);
        MetricFetcherManager metricFetcherManager = new MetricFetcherManager(kafkaCruiseControlConfig, mockPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, METRIC_DEF, TIME, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (BrokerCapacityConfigResolver) null, new MockSampler(0, TIME));
        LoadMonitorTaskRunner loadMonitorTaskRunner = new LoadMonitorTaskRunner(kafkaCruiseControlConfig, metricFetcherManager, mockPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, TIME);
        awaitTopicCreation(metadataClient);
        loadMonitorTaskRunner.start(true);
        HashSet hashSet = new HashSet(400);
        for (int i = 0; i < NUM_TOPICS; i++) {
            for (int i2 = 0; i2 < NUM_PARTITIONS; i2++) {
                hashSet.add(new TopicPartition("LoadMonitorTaskRunnerTest-" + i, i2));
            }
        }
        long currentTimeMillis = System.currentTimeMillis();
        BlockingQueue<PartitionMetricSample> metricSampleQueue = mockPartitionMetricSampleAggregator.metricSampleQueue();
        while (!hashSet.isEmpty() && System.currentTimeMillis() < currentTimeMillis + WINDOW_MS) {
            PartitionMetricSample poll = metricSampleQueue.poll();
            if (poll != null) {
                TopicPartition tp = poll.entity().tp();
                if (tp.topic().contains(TEST_TOPIC_NAME)) {
                    Assert.assertTrue(String.format("The topic partition %s should have been sampled and sampled only once.", tp), hashSet.contains(tp));
                    hashSet.remove(tp);
                }
            }
        }
        Assert.assertTrue("Did not see sample for partitions " + Arrays.toString(hashSet.toArray()), hashSet.isEmpty());
        metricFetcherManager.shutdown();
        Assert.assertTrue(metricSampleQueue.isEmpty());
        loadMonitorTaskRunner.shutdown();
        metadataClient.close();
    }

    @Test
    public void testSamplingError() throws InterruptedException {
        KafkaCruiseControlConfig kafkaCruiseControlConfig = new KafkaCruiseControlConfig(getLoadMonitorProperties());
        MetadataClient metadataClient = new MetadataClient(kafkaCruiseControlConfig, -1L, TIME);
        MockPartitionMetricSampleAggregator mockPartitionMetricSampleAggregator = new MockPartitionMetricSampleAggregator(kafkaCruiseControlConfig, metadataClient);
        KafkaBrokerMetricSampleAggregator kafkaBrokerMetricSampleAggregator = (KafkaBrokerMetricSampleAggregator) EasyMock.mock(KafkaBrokerMetricSampleAggregator.class);
        MetricFetcherManager metricFetcherManager = new MetricFetcherManager(kafkaCruiseControlConfig, mockPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, METRIC_DEF, TIME, KafkaCruiseControlUnitTestUtils.getMetricsRegistry(this.metricsRegistry), (BrokerCapacityConfigResolver) null, new MockSampler(0, TIME));
        LoadMonitorTaskRunner loadMonitorTaskRunner = new LoadMonitorTaskRunner(kafkaCruiseControlConfig, metricFetcherManager, mockPartitionMetricSampleAggregator, kafkaBrokerMetricSampleAggregator, metadataClient, TIME);
        awaitTopicCreation(metadataClient);
        loadMonitorTaskRunner.start(true);
        int i = 0;
        long currentTimeMillis = System.currentTimeMillis();
        BlockingQueue<PartitionMetricSample> metricSampleQueue = mockPartitionMetricSampleAggregator.metricSampleQueue();
        while (i < 4000 && System.currentTimeMillis() < currentTimeMillis + WINDOW_MS) {
            PartitionMetricSample poll = metricSampleQueue.poll();
            if (poll != null && poll.entity().tp().topic().contains(TEST_TOPIC_NAME)) {
                i++;
            }
        }
        Assert.assertEquals("Only see " + i + " samples. Expecting 400 samples", 400, i);
        metricFetcherManager.shutdown();
        loadMonitorTaskRunner.shutdown();
        metadataClient.close();
    }

    private void awaitTopicCreation(MetadataClient metadataClient) throws InterruptedException {
        while (100 != metadataClient.cluster().topics().stream().filter(str -> {
            return str.contains(TEST_TOPIC_NAME);
        }).count()) {
            Thread.sleep(10L);
            metadataClient.refreshMetadata();
        }
    }

    private Properties getLoadMonitorProperties() {
        Properties kafkaCruiseControlProperties = KafkaCruiseControlUnitTestUtils.getKafkaCruiseControlProperties();
        kafkaCruiseControlProperties.setProperty("bootstrap.servers", bootstrapServers());
        kafkaCruiseControlProperties.setProperty("partition.metrics.window.ms", Long.toString(WINDOW_MS));
        kafkaCruiseControlProperties.setProperty("num.partition.metrics.windows", Integer.toString(NUM_WINDOWS));
        kafkaCruiseControlProperties.setProperty("metric.sampler.class", MockSampler.class.getName());
        kafkaCruiseControlProperties.setProperty("min.samples.per.partition.metrics.window", "2");
        kafkaCruiseControlProperties.setProperty("metric.sampling.interval.ms", Long.toString(SAMPLING_INTERVAL));
        kafkaCruiseControlProperties.setProperty("sample.store.class", NoopSampleStore.class.getName());
        return kafkaCruiseControlProperties;
    }
}
