package io.confluent.metrics.reporter.integration;

import io.confluent.metrics.record.ConfluentMetric;
import java.util.Collections;
import java.util.Iterator;
import java.util.Properties;
import kafka.utils.TestUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.collection.JavaConversions;

/* loaded from: input_file:io/confluent/metrics/reporter/integration/ClientMetricReporterTest.class */
public class ClientMetricReporterTest extends MetricsReporterTest {
    private Producer<String, String> testProducer;
    private static final String TOPIC = "testtopic";
    private KafkaConsumer<byte[], byte[]> testConsumer;

    @Override // io.confluent.metrics.reporter.integration.MetricsReporterTest, io.confluent.metrics.reporter.integration.MetricReporterClusterTestHarness
    @Before
    public void setUp() throws Exception {
        this.enableMetricsReporterOnBroker = false;
        super.setUp();
        TestUtils.createTopic(this.zkClient, TOPIC, 2, 1, JavaConversions.asScalaBuffer(this.servers).seq(), new Properties());
        this.testProducer = createProducer();
        produceTestData(this.testProducer, TOPIC, 50);
        this.testConsumer = createConsumer();
        consumeTestData(this.testConsumer, TOPIC);
    }

    @Override // io.confluent.metrics.reporter.integration.MetricsReporterTest, io.confluent.metrics.reporter.integration.MetricReporterClusterTestHarness
    @After
    public void tearDown() throws Exception {
        this.testProducer.close();
        this.testConsumer.close();
        super.tearDown();
    }

    @Override // io.confluent.metrics.reporter.integration.MetricsReporterTest
    @Test
    public void testMetricsReporter() {
        long currentTimeMillis = System.currentTimeMillis();
        int i = 0;
        int i2 = 0;
        int i3 = 0;
        int i4 = 0;
        while (System.currentTimeMillis() - currentTimeMillis < 20000) {
            Iterator it = this.consumer.poll(200L).iterator();
            while (it.hasNext()) {
                ConfluentMetric.MetricsMessage deserialize = this.serdes.deserialize((byte[]) ((ConsumerRecord) it.next()).value());
                if (deserialize.getMetricType() == ConfluentMetric.MetricType.CONSUMER) {
                    Assert.assertEquals("groupId should be set", "test-group", deserialize.getGroupId());
                    Assert.assertEquals("clientId should be set", "test-consumer", deserialize.getClientId());
                    i2++;
                } else if (deserialize.getMetricType() == ConfluentMetric.MetricType.PRODUCER) {
                    Assert.assertEquals("clientId should be set", "test-producer", deserialize.getClientId());
                    i++;
                } else {
                    Assert.fail("only clients should produce metrics");
                }
                Assert.assertEquals("brokerId should be -1", -1L, deserialize.getBrokerId());
                Assert.assertTrue("clusterId should be set", !deserialize.getClusterId().isEmpty());
                i3 += deserialize.getKafkaMeasurableCount();
                i4 = i4 + deserialize.getYammerGaugeCount() + deserialize.getYammerMeterCount() + deserialize.getYammerHistogramCount() + deserialize.getYammerTimerCount();
            }
        }
        Assert.assertTrue("Clients should have a KafkaMeasurable metric", i3 > 0);
        Assert.assertEquals("Clients should not have a Yammer metric", 0L, i4);
        Assert.assertTrue("should receive metrics from consumer", i2 > 0);
        Assert.assertTrue("should receive metrics from producer", i > 0);
    }

    private Producer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("acks", "all");
        properties.put("batch.size", 10);
        properties.put("linger.ms", 1);
        properties.put("client.id", "test-producer");
        injectMetricReporterProperties(properties, this.brokerList);
        return new KafkaProducer(properties, new StringSerializer(), new StringSerializer());
    }

    private KafkaConsumer<byte[], byte[]> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("group.id", "test-group");
        properties.put("client.id", "test-consumer");
        injectMetricReporterProperties(properties, this.brokerList);
        return new KafkaConsumer<>(properties, new ByteArrayDeserializer(), new ByteArrayDeserializer());
    }

    private void produceTestData(Producer<String, String> producer, String str, int i) {
        for (int i2 = 0; i2 < i; i2++) {
            producer.send(new ProducerRecord(str, Integer.toString(i2), Integer.toString(i2)));
        }
    }

    private void consumeTestData(KafkaConsumer<byte[], byte[]> kafkaConsumer, String str) {
        kafkaConsumer.subscribe(Collections.singleton(str));
        Iterator it = kafkaConsumer.poll(200L).iterator();
        while (it.hasNext()) {
        }
    }
}
