package io.confluent.cruisecontrol.metricsreporter;

import com.linkedin.kafka.cruisecontrol.config.BrokerCapacityResolver;
import com.linkedin.kafka.cruisecontrol.config.KafkaCruiseControlConfig;
import com.linkedin.kafka.cruisecontrol.monitor.metricdefinition.KafkaMetricDef;
import com.linkedin.kafka.cruisecontrol.monitor.sampling.MetricSampler;
import io.confluent.databalancer.TestConstants;
import io.confluent.metrics.reporter.integration.MetricReporterClusterTestHarness;
import io.confluent.telemetry.ConfluentTelemetryConfig;
import io.confluent.telemetry.exporter.ExporterConfig;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.server.ZkMetadataCache;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

@Tag(TestConstants.INTEGRATION_TEST)
/* loaded from: input_file:io/confluent/cruisecontrol/metricsreporter/ConfluentTelemetryReporterSamplerIntegrationTest.class */
public class ConfluentTelemetryReporterSamplerIntegrationTest extends MetricReporterClusterTestHarness {
    private ConfluentTelemetryReporterSampler sampler = new ConfluentTelemetryReporterSampler();

    @BeforeEach
    public void setup() throws Exception {
        super.setUp();
    }

    @AfterEach
    public void tearDown() throws Exception {
        super.tearDown();
    }

    @Test
    public void testSampler() throws InterruptedException {
        HashMap hashMap = new HashMap();
        hashMap.put("network.in.max.bytes.per.second", 1000000L);
        hashMap.put("network.out.max.bytes.per.second", 1000000L);
        hashMap.put(BrokerCapacityResolver.LOG_DIRS_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
        BrokerCapacityResolver brokerCapacityResolver = new BrokerCapacityResolver();
        brokerCapacityResolver.configure(hashMap);
        ZkMetadataCache metadataCache = ((KafkaServer) this.servers.get(0)).metadataCache();
        TestUtils.waitForCondition(() -> {
            return metadataCache.getClusterMetadata((String) this.zkClient.getClusterId().get(), kafkaConfig(0).interBrokerListenerName()).topics().contains("_confluent-telemetry-metrics");
        }, 600000L, "Metrics topic _confluent-telemetry-metrics does not exist");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("zookeeper.connect", this.zkConnect);
        properties.put("broker.capacity.config.resolver.object", brokerCapacityResolver);
        this.sampler.configure(new KafkaCruiseControlConfig(properties).mergedConfigValues());
        Cluster clusterMetadata = metadataCache.getClusterMetadata((String) this.zkClient.getClusterId().get(), kafkaConfig(0).interBrokerListenerName());
        HashSet hashSet = new HashSet();
        Iterator it = clusterMetadata.topics().iterator();
        while (it.hasNext()) {
            for (PartitionInfo partitionInfo : clusterMetadata.partitionsForTopic((String) it.next())) {
                hashSet.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
            }
        }
        TestUtils.retryOnExceptionWithTimeout(5000L, 30000L, () -> {
            MetricSampler.Samples samples = this.sampler.getSamples(clusterMetadata, hashSet, 0L, System.currentTimeMillis(), MetricSampler.SamplingMode.ALL, KafkaMetricDef.commonMetricDef(), 300000L);
            Assertions.assertFalse(samples.partitionMetricSamples().isEmpty());
            Assertions.assertTrue(samples.brokerMetricSamples().size() >= 1 && samples.brokerMetricSamples().stream().anyMatch(brokerMetricSample -> {
                return brokerMetricSample.entity().brokerId() == 1;
            }));
        });
    }

    protected void injectMetricReporterProperties(Properties properties, String str) {
        properties.setProperty(KafkaConfig.MetricReporterClassesProp(), "io.confluent.telemetry.reporter.KafkaServerMetricsReporter");
        properties.setProperty(ConfluentTelemetryConfig.exporterPrefixForName("_confluent") + "enabled", "false");
        properties.setProperty(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "type", ExporterConfig.ExporterType.kafka.name());
        properties.setProperty(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "enabled", "true");
        properties.setProperty(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "producer.bootstrap.servers", str);
        properties.setProperty(ConfluentTelemetryConfig.exporterPrefixForName("_local") + "topic.replicas", "2");
        properties.setProperty("confluent.telemetry.metrics.collector.interval.ms", "500");
        properties.setProperty("confluent.telemetry.labels.region", "test");
        properties.setProperty("confluent.telemetry.labels.pkc", "pkc-bar");
        properties.setProperty("confluent.telemetry.debug.enabled", "true");
        properties.setProperty(KafkaConfig.LogFlushIntervalMessagesProp(), "1");
    }
}
