package io.confluent.support.metrics;

import io.confluent.support.metrics.common.Collector;
import io.confluent.support.metrics.common.kafka.EmbeddedKafkaCluster;
import io.confluent.support.metrics.common.kafka.KafkaUtilities;
import io.confluent.support.metrics.common.time.TimeUtils;
import io.confluent.support.metrics.serde.AvroDeserializer;
import io.confluent.support.metrics.tools.KafkaMetricsToFile;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import kafka.server.KafkaConfig$;
import kafka.server.KafkaServer;
import kafka.utils.TestUtils;
import kafka.zk.KafkaZkClient;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.utils.AppInfoParser;
import org.junit.Assert;
import org.junit.Test;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/support/metrics/MetricsToKafkaTest.class */
public class MetricsToKafkaTest {
    @Test
    public void savesAsManyMetricsToFileAsHaveBeenSubmittedBySingleNodeCluster() throws IOException {
        Runtime runtime = Runtime.getRuntime();
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startCluster(1);
        KafkaServer broker = embeddedKafkaCluster.getBroker(0);
        Properties defaultBrokerConfiguration = defaultBrokerConfiguration(broker, embeddedKafkaCluster.zookeeperConnectString());
        defaultBrokerConfiguration.setProperty("confluent.support.metrics.endpoint.insecure.enable", "false");
        defaultBrokerConfiguration.setProperty("confluent.support.metrics.endpoint.secure.enable", "false");
        defaultBrokerConfiguration.setProperty("confluent.support.metrics.topic", "test_metrics");
        String property = defaultBrokerConfiguration.getProperty("confluent.support.metrics.topic");
        KafkaMetricsToFile kafkaMetricsToFile = new KafkaMetricsToFile(bootstrapServer(broker.zkClient()));
        MetricsReporter metricsReporter = new MetricsReporter("testThread", false, broker, new KafkaSupportConfig(defaultBrokerConfiguration), runtime);
        metricsReporter.init();
        for (int i = 0; i < 10; i++) {
            metricsReporter.submitMetrics();
        }
        Assert.assertEquals(10, kafkaMetricsToFile.saveMetricsToFile(property, "testFile.zip", 10000));
        Files.delete(Paths.get("testFile.zip", new String[0]));
        embeddedKafkaCluster.stopCluster();
    }

    private Properties defaultBrokerConfiguration(KafkaServer kafkaServer, String str) throws IOException {
        Properties properties = new Properties();
        properties.load(MetricsToKafkaTest.class.getResourceAsStream("/default-server.properties"));
        properties.setProperty(KafkaConfig$.MODULE$.BrokerIdProp(), Integer.toString(kafkaServer.config().brokerId()));
        properties.setProperty(KafkaConfig$.MODULE$.ZkConnectProp(), str);
        return properties;
    }

    @Test
    public void retrievesBasicMetricsSubmittedByMultiNodeCluster() throws IOException {
        EmbeddedKafkaCluster embeddedKafkaCluster = new EmbeddedKafkaCluster();
        embeddedKafkaCluster.startCluster(3);
        KafkaServer broker = embeddedKafkaCluster.getBroker(0);
        Properties defaultBrokerConfiguration = defaultBrokerConfiguration(broker, embeddedKafkaCluster.zookeeperConnectString());
        defaultBrokerConfiguration.setProperty("confluent.support.metrics.endpoint.insecure.enable", "false");
        defaultBrokerConfiguration.setProperty("confluent.support.metrics.endpoint.secure.enable", "false");
        defaultBrokerConfiguration.setProperty("confluent.support.metrics.topic", "test_metrics");
        MetricsReporter metricsReporter = new MetricsReporter("testThread", false, broker, new KafkaSupportConfig(defaultBrokerConfiguration), Runtime.getRuntime());
        String property = defaultBrokerConfiguration.getProperty("confluent.support.metrics.topic");
        metricsReporter.init();
        for (int i = 0; i < 10; i++) {
            metricsReporter.submitMetrics();
        }
        verifyMetricsSubmittedToTopic(bootstrapServer(broker.zkClient()), property, 10);
        embeddedKafkaCluster.stopCluster();
    }

    private static void verifyMetricsSubmittedToTopic(String str, String str2, int i) throws IOException {
        KafkaConsumer createConsumer = new KafkaMetricsToFile(str).createConsumer();
        createConsumer.subscribe(Collections.singleton(str2));
        Collection asJavaCollection = JavaConverters.asJavaCollectionConverter(TestUtils.consumeRecords(createConsumer, i, 10000)).asJavaCollection();
        AvroDeserializer avroDeserializer = new AvroDeserializer();
        Iterator it = asJavaCollection.iterator();
        while (it.hasNext()) {
            SupportKafkaMetricsBasic[] supportKafkaMetricsBasicArr = (SupportKafkaMetricsBasic[]) avroDeserializer.deserialize(SupportKafkaMetricsBasic.class, (byte[]) ((ConsumerRecord) it.next()).value());
            Assert.assertEquals(1L, supportKafkaMetricsBasicArr.length);
            verifyBasicMetrics(supportKafkaMetricsBasicArr[0]);
        }
    }

    private static void verifyBasicMetrics(SupportKafkaMetricsBasic supportKafkaMetricsBasic) {
        Assert.assertTrue(supportKafkaMetricsBasic.getTimestamp() <= new TimeUtils().nowInUnixTime());
        Assert.assertEquals(AppInfoParser.getVersion(), supportKafkaMetricsBasic.getKafkaVersion());
        Assert.assertEquals(Collector.cpVersion(AppInfoParser.getVersion()), supportKafkaMetricsBasic.getConfluentPlatformVersion());
        Assert.assertFalse(supportKafkaMetricsBasic.getBrokerProcessUUID().isEmpty());
    }

    private String bootstrapServer(KafkaZkClient kafkaZkClient) {
        return (String) new KafkaUtilities().getBootstrapServers(kafkaZkClient, 1).get(0);
    }
}
