package org.apache.gobblin.metrics.reporter;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.gobblin.metrics.Measurements;
import org.apache.gobblin.metrics.Metric;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.MetricReport;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.kafka.KafkaReporter;
import org.apache.gobblin.metrics.kafka.Pusher;
import org.apache.gobblin.metrics.reporter.util.MetricReportUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"gobblin.metrics"})
/* loaded from: input_file:org/apache/gobblin/metrics/reporter/KafkaReporterTest.class */
public class KafkaReporterTest {
    public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilder(Pusher pusher) {
        return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
    }

    public KafkaReporter.Builder<? extends KafkaReporter.Builder> getBuilderFromContext(Pusher pusher) {
        return KafkaReporter.BuilderFactory.newBuilder().withKafkaPusher(pusher);
    }

    @Test
    public void testKafkaReporter() throws IOException {
        MetricContext build = MetricContext.builder(getClass().getCanonicalName() + ".testKafkaReporter").build();
        Counter counter = build.counter("com.linkedin.example.counter");
        Meter meter = build.meter("com.linkedin.example.meter");
        Histogram histogram = build.histogram("com.linkedin.example.histogram");
        MockKafkaPusher mockKafkaPusher = new MockKafkaPusher();
        KafkaReporter build2 = getBuilder(mockKafkaPusher).build("localhost:0000", "topic", new Properties());
        counter.inc();
        meter.mark(2L);
        histogram.update(1);
        histogram.update(1);
        histogram.update(2);
        build2.report(build);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Map<String, Double> hashMap = new HashMap<>();
        hashMap.put("com.linkedin.example.counter." + Measurements.COUNT, Double.valueOf(1.0d));
        hashMap.put("com.linkedin.example.meter." + Measurements.COUNT, Double.valueOf(2.0d));
        hashMap.put("com.linkedin.example.histogram." + Measurements.COUNT, Double.valueOf(3.0d));
        expectMetricsWithValues(nextReport(mockKafkaPusher.messageIterator()), hashMap);
        build2.report(build);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        Set<String> hashSet = new HashSet<>();
        hashSet.add("com.linkedin.example.counter." + Measurements.COUNT);
        hashSet.add("com.linkedin.example.meter." + Measurements.COUNT);
        hashSet.add("com.linkedin.example.meter." + Measurements.MEAN_RATE);
        hashSet.add("com.linkedin.example.meter." + Measurements.RATE_1MIN);
        hashSet.add("com.linkedin.example.meter." + Measurements.RATE_5MIN);
        hashSet.add("com.linkedin.example.meter." + Measurements.RATE_15MIN);
        hashSet.add("com.linkedin.example.histogram." + Measurements.MEAN);
        hashSet.add("com.linkedin.example.histogram." + Measurements.MIN);
        hashSet.add("com.linkedin.example.histogram." + Measurements.MAX);
        hashSet.add("com.linkedin.example.histogram." + Measurements.MEDIAN);
        hashSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_75TH);
        hashSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_95TH);
        hashSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_99TH);
        hashSet.add("com.linkedin.example.histogram." + Measurements.PERCENTILE_999TH);
        hashSet.add("com.linkedin.example.histogram." + Measurements.COUNT);
        expectMetrics(nextReport(mockKafkaPusher.messageIterator()), hashSet, true);
        build2.close();
    }

    @Test
    public void kafkaReporterTagsTest() throws IOException {
        MetricContext build = MetricContext.builder(getClass().getCanonicalName() + ".kafkaReporterTagsTest").build();
        Counter counter = build.counter("com.linkedin.example.counter");
        Tag tag = new Tag("tag1", "value1");
        Tag tag2 = new Tag("tag2", 2);
        MockKafkaPusher mockKafkaPusher = new MockKafkaPusher();
        KafkaReporter build2 = getBuilder(mockKafkaPusher).withTags(Lists.newArrayList(new Tag[]{tag, tag2})).build("localhost:0000", "topic", new Properties());
        counter.inc();
        build2.report(build);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        MetricReport nextReport = nextReport(mockKafkaPusher.messageIterator());
        Assert.assertEquals(4, nextReport.getTags().size());
        Assert.assertTrue(nextReport.getTags().containsKey(tag.getKey()));
        Assert.assertEquals((String) nextReport.getTags().get(tag.getKey()), tag.getValue().toString());
        Assert.assertTrue(nextReport.getTags().containsKey(tag2.getKey()));
        Assert.assertEquals((String) nextReport.getTags().get(tag2.getKey()), tag2.getValue().toString());
    }

    @Test
    public void kafkaReporterContextTest() throws IOException {
        Tag tag = new Tag("tag1", "value1");
        MetricContext build = MetricContext.builder("context").addTag(tag).build();
        Counter counter = build.counter("com.linkedin.example.counter");
        MockKafkaPusher mockKafkaPusher = new MockKafkaPusher();
        KafkaReporter build2 = getBuilderFromContext(mockKafkaPusher).build("localhost:0000", "topic", new Properties());
        counter.inc();
        build2.report(build);
        try {
            Thread.sleep(1000L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        MetricReport nextReport = nextReport(mockKafkaPusher.messageIterator());
        Assert.assertEquals(3, nextReport.getTags().size());
        Assert.assertTrue(nextReport.getTags().containsKey(tag.getKey()));
        Assert.assertEquals((String) nextReport.getTags().get(tag.getKey()), tag.getValue().toString());
    }

    private void expectMetricsWithValues(MetricReport metricReport, Map<String, Double> map) throws IOException {
        for (Metric metric : metricReport.getMetrics()) {
            if (map.containsKey(metric.getName())) {
                Assert.assertEquals(map.get(metric.getName()), metric.getValue());
                map.remove(metric.getName());
            }
        }
        Assert.assertTrue(map.isEmpty());
    }

    private void expectMetrics(MetricReport metricReport, Set<String> set, boolean z) throws IOException {
        for (Metric metric : metricReport.getMetrics()) {
            if (set.contains(metric.getName())) {
                set.remove(metric.getName());
            } else if (z && !metric.getName().contains("gobblin.metrics.notifications.timer")) {
                Assert.assertTrue(false, "Metric present in report not expected: " + metric.toString());
            }
        }
        Assert.assertTrue(set.isEmpty());
    }

    protected MetricReport nextReport(Iterator<byte[]> it) throws IOException {
        Assert.assertTrue(it.hasNext());
        return MetricReportUtils.deserializeReportFromJson(new MetricReport(), it.next());
    }
}
