/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.metrics.reporter.integration;

import io.confluent.metrics.record.ConfluentMetric;
import io.confluent.metrics.reporter.integration.MetricsReporterTest;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import kafka.utils.TestUtils;
import kafka.utils.ZkUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.collection.JavaConversions;
import scala.collection.Seq;

public class ClientMetricReporterTest
extends MetricsReporterTest {
    private Producer<String, String> testProducer;
    private static final String TOPIC = "testtopic";
    private KafkaConsumer<byte[], byte[]> testConsumer;

    @Override
    @Before
    public void setUp() throws Exception {
        this.enableMetricsReporterOnBroker = false;
        super.setUp();
        TestUtils.createTopic((ZkUtils)this.zkUtils, (String)TOPIC, (int)2, (int)1, (Seq)JavaConversions.asScalaBuffer((List)this.servers).seq(), (Properties)new Properties());
        this.testProducer = this.createProducer();
        this.produceTestData(this.testProducer, TOPIC, 50);
        this.testConsumer = this.createConsumer();
        this.consumeTestData(this.testConsumer, TOPIC);
    }

    @Override
    @After
    public void tearDown() throws Exception {
        this.testProducer.close();
        this.testConsumer.close();
        super.tearDown();
    }

    @Override
    @Test
    public void testMetricsReporter() {
        long startMs = System.currentTimeMillis();
        int producerMetricCount = 0;
        int consumerMetricCount = 0;
        int kafkaMeasurables = 0;
        int yammerMetrics = 0;
        while (System.currentTimeMillis() - startMs < 20000L) {
            ConsumerRecords records = this.consumer.poll(200L);
            for (ConsumerRecord record : records) {
                ConfluentMetric.MetricsMessage metricsMessage = (ConfluentMetric.MetricsMessage)this.serdes.deserialize((byte[])record.value());
                if (metricsMessage.getMetricType() == ConfluentMetric.MetricType.CONSUMER) {
                    Assert.assertEquals((String)"groupId should be set", (Object)"test-group", (Object)metricsMessage.getGroupId());
                    Assert.assertEquals((String)"clientId should be set", (Object)"test-consumer", (Object)metricsMessage.getClientId());
                    ++consumerMetricCount;
                } else if (metricsMessage.getMetricType() == ConfluentMetric.MetricType.PRODUCER) {
                    Assert.assertEquals((String)"clientId should be set", (Object)"test-producer", (Object)metricsMessage.getClientId());
                    ++producerMetricCount;
                } else {
                    Assert.fail((String)"only clients should produce metrics");
                }
                Assert.assertEquals((String)"brokerId should be -1", (long)-1L, (long)metricsMessage.getBrokerId());
                Assert.assertTrue((String)"clusterId should be set", (!metricsMessage.getClusterId().isEmpty() ? 1 : 0) != 0);
                kafkaMeasurables += metricsMessage.getKafkaMeasurableCount();
                yammerMetrics += metricsMessage.getYammerGaugeCount();
                yammerMetrics += metricsMessage.getYammerMeterCount();
                yammerMetrics += metricsMessage.getYammerHistogramCount();
                yammerMetrics += metricsMessage.getYammerTimerCount();
            }
        }
        Assert.assertTrue((String)"Clients should have a KafkaMeasurable metric", (kafkaMeasurables > 0 ? 1 : 0) != 0);
        Assert.assertEquals((String)"Clients should not have a Yammer metric", (long)0L, (long)yammerMetrics);
        Assert.assertTrue((String)"should receive metrics from consumer", (consumerMetricCount > 0 ? 1 : 0) != 0);
        Assert.assertTrue((String)"should receive metrics from producer", (producerMetricCount > 0 ? 1 : 0) != 0);
    }

    private Producer<String, String> createProducer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("acks", "all");
        properties.put("batch.size", (Object)10);
        properties.put("linger.ms", (Object)1);
        properties.put("client.id", "test-producer");
        this.injectMetricReporterProperties(properties, this.brokerList);
        return new KafkaProducer(properties, (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
    }

    private KafkaConsumer<byte[], byte[]> createConsumer() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("group.id", "test-group");
        properties.put("client.id", "test-consumer");
        this.injectMetricReporterProperties(properties, this.brokerList);
        return new KafkaConsumer(properties, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer());
    }

    private void produceTestData(Producer<String, String> producer, String topic, int numRecords) {
        for (int i = 0; i < numRecords; ++i) {
            producer.send(new ProducerRecord(topic, (Object)Integer.toString(i), (Object)Integer.toString(i)));
        }
    }

    private void consumeTestData(KafkaConsumer<byte[], byte[]> consumer, String topic) {
        consumer.subscribe(Collections.singleton(topic));
        ConsumerRecords records = consumer.poll(200L);
        for (ConsumerRecord consumerRecord : records) {
        }
    }
}

