package org.apache.gobblin.metrics.reporter;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.avro.Schema;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.gobblin.metrics.GobblinTrackingEvent;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.kafka.KafkaAvroEventKeyValueReporter;
import org.apache.gobblin.metrics.kafka.KafkaAvroSchemaRegistry;
import org.apache.gobblin.metrics.kafka.KafkaEventReporter;
import org.apache.gobblin.metrics.kafka.Pusher;
import org.apache.gobblin.metrics.reporter.util.EventUtils;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/gobblin/metrics/reporter/KafkaAvroEventKeyValueReporterTest.class */
public class KafkaAvroEventKeyValueReporterTest extends KafkaAvroEventReporterTest {
    private static final int SCHEMA_ID_LENGTH_BYTES = 20;
    private String schemaId;

    @BeforeClass
    public void setUp() throws IOException {
        this.schemaId = DigestUtils.sha1Hex(new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")).toString().getBytes());
    }

    @Override // org.apache.gobblin.metrics.reporter.KafkaAvroEventReporterTest, org.apache.gobblin.metrics.reporter.KafkaEventReporterTest
    public KafkaEventReporter.Builder<? extends KafkaEventReporter.Builder> getBuilder(MetricContext metricContext, Pusher pusher) {
        return KafkaAvroEventKeyValueReporter.Factory.forContext(metricContext).withKafkaPusher(pusher).withKeys(Lists.newArrayList(new String[]{"k1", "k2", "k3"}));
    }

    private Pair<String, GobblinTrackingEvent> nextKVEvent(Iterator<Pair<String, byte[]>> it, boolean z) throws IOException {
        Assert.assertTrue(it.hasNext());
        Pair<String, byte[]> next = it.next();
        return z ? Pair.of(next.getKey(), EventUtils.deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), (byte[]) next.getValue(), this.schemaId)) : Pair.of(next.getKey(), EventUtils.deserializeEventFromAvroSerialization(new GobblinTrackingEvent(), (byte[]) next.getValue()));
    }

    private GobblinTrackingEvent getEvent(boolean z) {
        GobblinTrackingEvent gobblinTrackingEvent = new GobblinTrackingEvent();
        gobblinTrackingEvent.setName("testEvent");
        gobblinTrackingEvent.setNamespace("gobblin.metrics.test");
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.put("m1", "v1");
        newHashMap.put("m2", null);
        if (z) {
            newHashMap.put("k1", "v1");
            newHashMap.put("k2", "v2");
            newHashMap.put("k3", "v3");
        }
        gobblinTrackingEvent.setMetadata(newHashMap);
        return gobblinTrackingEvent;
    }

    @Override // org.apache.gobblin.metrics.reporter.KafkaEventReporterTest
    @Test
    public void testKafkaEventReporter() throws IOException {
        MetricContext build = MetricContext.builder("context").build();
        MockKafkaKeyValuePusher mockKafkaKeyValuePusher = new MockKafkaKeyValuePusher();
        KafkaEventReporter build2 = getBuilder(build, mockKafkaKeyValuePusher).build("localhost:0000", "topic");
        build.submitEvent(getEvent(false));
        build2.report();
        Assert.assertNull(nextKVEvent(mockKafkaKeyValuePusher.messageIterator(), false).getKey());
        build.submitEvent(getEvent(true));
        build2.report();
        Assert.assertEquals((String) nextKVEvent(mockKafkaKeyValuePusher.messageIterator(), false).getKey(), "v1v2v3");
    }

    @Test
    public void testKafkaEventReporterWithSchemaRegistry() throws IOException {
        MetricContext build = MetricContext.builder("context").build();
        MockKafkaKeyValuePusher mockKafkaKeyValuePusher = new MockKafkaKeyValuePusher();
        KafkaAvroEventKeyValueReporter build2 = KafkaAvroEventKeyValueReporter.Factory.forContext(build).withKafkaPusher(mockKafkaKeyValuePusher).withKeys(Lists.newArrayList(new String[]{"k1", "k2", "k3"})).withSchemaRegistry((KafkaAvroSchemaRegistry) Mockito.mock(KafkaAvroSchemaRegistry.class)).withSchemaId(DigestUtils.sha1Hex(new Schema.Parser().parse(getClass().getClassLoader().getResourceAsStream("GobblinTrackingEvent.avsc")).toString().getBytes())).build("localhost:0000", "topic");
        build.submitEvent(getEvent(true));
        build2.report();
        Assert.assertEquals((String) nextKVEvent(mockKafkaKeyValuePusher.messageIterator(), true).getKey(), "v1v2v3");
    }

    @Override // org.apache.gobblin.metrics.reporter.KafkaEventReporterTest
    @Test(enabled = false)
    public void testTagInjection() throws IOException {
    }
}
