package org.apache.gobblin.metrics.reporter;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.Tag;
import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
import org.apache.gobblin.metrics.kafka.Pusher;
import org.apache.gobblin.metrics.reporter.util.EventUtils;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"gobblin.metrics"})
/* loaded from: input_file:org/apache/gobblin/metrics/reporter/KafkaEventReporterTest.class */
public class KafkaEventReporterTest {
    public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext metricContext, Pusher pusher) {
        return KafkaEventReporter.Factory.forContext(metricContext).withKafkaPusher(pusher);
    }

    @Test
    public void testKafkaEventReporter() throws IOException {
        MetricContext build = MetricContext.builder("context").build();
        MockKafkaPusher mockKafkaPusher = new MockKafkaPusher();
        KafkaEventReporter build2 = getBuilder(build, mockKafkaPusher).build("localhost:0000", "topic");
        GobblinTrackingEvent gobblinTrackingEvent = new GobblinTrackingEvent();
        gobblinTrackingEvent.setName("testEvent");
        gobblinTrackingEvent.setNamespace("gobblin.metrics.test");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("m1", "v1");
        newHashMap.put("m2", null);
        gobblinTrackingEvent.setMetadata(newHashMap);
        build.submitEvent(gobblinTrackingEvent);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        build2.report();
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        GobblinTrackingEvent nextEvent = nextEvent(mockKafkaPusher.messageIterator());
        Assert.assertEquals(nextEvent.getNamespace(), "gobblin.metrics.test");
        Assert.assertEquals(nextEvent.getName(), "testEvent");
        Assert.assertEquals(nextEvent.getMetadata().size(), 4);
    }

    @Test
    public void testTagInjection() throws IOException {
        MetricContext build = MetricContext.builder("context").addTag(new Tag("tag1", "value1")).addTag(new Tag("tag2", "value2")).build();
        MockKafkaPusher mockKafkaPusher = new MockKafkaPusher();
        KafkaEventReporter build2 = getBuilder(build, mockKafkaPusher).build("localhost:0000", "topic");
        GobblinTrackingEvent gobblinTrackingEvent = new GobblinTrackingEvent();
        gobblinTrackingEvent.setName("testEvent");
        gobblinTrackingEvent.setNamespace("gobblin.metrics.test");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("tag1", "metadata1");
        gobblinTrackingEvent.setMetadata(newHashMap);
        build.submitEvent(gobblinTrackingEvent);
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        build2.report();
        try {
            Thread.sleep(100L);
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
        GobblinTrackingEvent nextEvent = nextEvent(mockKafkaPusher.messageIterator());
        Assert.assertEquals(nextEvent.getNamespace(), "gobblin.metrics.test");
        Assert.assertEquals(nextEvent.getName(), "testEvent");
        Assert.assertEquals(nextEvent.getMetadata().size(), 4);
        Assert.assertEquals((String) nextEvent.getMetadata().get("tag1"), "metadata1");
        Assert.assertEquals((String) nextEvent.getMetadata().get("tag2"), "value2");
    }

    @Test
    public void testEventReporterConfigs() throws IOException {
        MetricContext build = MetricContext.builder("context").build();
        MockKafkaPusher mockKafkaPusher = new MockKafkaPusher();
        KafkaEventReporter build2 = getBuilder(build, mockKafkaPusher).build("localhost:0000", "topic");
        Assert.assertEquals(build2.getQueueCapacity(), 100);
        Assert.assertEquals(build2.getQueueOfferTimeoutSecs(), 10);
        KafkaEventReporter build3 = getBuilder(build, mockKafkaPusher).withConfig(ConfigFactory.parseMap(ImmutableMap.builder().put("metrics.reporting.events.queue.capacity", 200).put("metrics.reporting.events.queue.offer.timeout.secs", 5).build())).build("localhost:0000", "topic");
        Assert.assertEquals(build3.getQueueCapacity(), 200);
        Assert.assertEquals(build3.getQueueOfferTimeoutSecs(), 5);
    }

    protected GobblinTrackingEvent nextEvent(Iterator<byte[]> it) throws IOException {
        Assert.assertTrue(it.hasNext());
        return EventUtils.deserializeReportFromJson(new GobblinTrackingEvent(), it.next());
    }
}
