package org.apache.gobblin.converter;

import com.google.common.collect.Lists;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
import org.apache.gobblin.metadata.GlobalMetadata;
import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
import org.apache.gobblin.source.extractor.extract.kafka.KafkaStreamTestUtils;
import org.apache.gobblin.stream.RecordEnvelope;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

/* loaded from: input_file:org/apache/gobblin/converter/KafkaSchemaChangeInjectorTest.class */
public class KafkaSchemaChangeInjectorTest {
    private static final String SCHEMA1 = "{\"namespace\": \"example.avro\",\n \"type\": \"record\",\n \"name\": \"user\",\n \"fields\": [\n     {\"name\": \"name\", \"type\": \"string\"},\n     {\"name\": \"DUMMY\", \"type\": [\"null\",\"string\"]}\n ]\n}";

    @Test
    public void testInjection() throws SchemaRegistryException, IOException {
        KafkaSchemaChangeInjector<Schema> kafkaSchemaChangeInjector = new KafkaSchemaChangeInjector<Schema>() { // from class: org.apache.gobblin.converter.KafkaSchemaChangeInjectorTest.1TestInjector
            /* JADX INFO: Access modifiers changed from: protected */
            /* renamed from: getSchemaIdentifier, reason: merged with bridge method [inline-methods] */
            public Schema m3getSchemaIdentifier(DecodeableKafkaRecord decodeableKafkaRecord) {
                return ((GenericRecord) decodeableKafkaRecord.getValue()).getSchema();
            }
        };
        WorkUnitState workUnitState = new WorkUnitState();
        workUnitState.setProp("kafka.schema.registry.class", KafkaStreamTestUtils.MockSchemaRegistry.class.getName());
        kafkaSchemaChangeInjector.init(workUnitState);
        Schema parse = new Schema.Parser().parse(SCHEMA1);
        Schema parse2 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY", "DUMMY2"));
        Schema parse3 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY", "DUMMY3"));
        Schema parse4 = new Schema.Parser().parse(SCHEMA1.replace("DUMMY", "DUMMY4"));
        kafkaSchemaChangeInjector.setInputGlobalMetadata(GlobalMetadata.builder().schema(parse).build(), (WorkUnitState) null);
        kafkaSchemaChangeInjector.getSchemaRegistry().register(parse, "topic1");
        DecodeableKafkaRecord mock = getMock("topic1", getRecord(parse, "name1"));
        DecodeableKafkaRecord mock2 = getMock("topic1", getRecord(parse2, "name1"));
        DecodeableKafkaRecord mock3 = getMock("topic1", getRecord(parse3, "name1"));
        DecodeableKafkaRecord mock4 = getMock("topic1", getRecord(parse4, "name1"));
        Assert.assertEquals(kafkaSchemaChangeInjector.getSchemaCache().size(), 0L);
        Assert.assertNotNull(kafkaSchemaChangeInjector.injectControlMessagesBefore(new RecordEnvelope(mock), workUnitState));
        Assert.assertNull(kafkaSchemaChangeInjector.injectControlMessagesBefore(new RecordEnvelope(mock), workUnitState));
        Assert.assertNull(kafkaSchemaChangeInjector.injectControlMessagesBefore(new RecordEnvelope(mock), workUnitState));
        Assert.assertEquals(kafkaSchemaChangeInjector.getSchemaCache().size(), 1L);
        Assert.assertNull(kafkaSchemaChangeInjector.injectControlMessagesBefore(new RecordEnvelope(mock2), workUnitState));
        Assert.assertEquals(kafkaSchemaChangeInjector.getSchemaCache().size(), 2L);
        kafkaSchemaChangeInjector.getSchemaRegistry().register(parse4, "topic1");
        Iterable injectControlMessagesBefore = kafkaSchemaChangeInjector.injectControlMessagesBefore(new RecordEnvelope(mock3), workUnitState);
        Assert.assertNotNull(injectControlMessagesBefore);
        Assert.assertEquals(Lists.newArrayList(injectControlMessagesBefore).size(), 1);
        Assert.assertNull(kafkaSchemaChangeInjector.injectControlMessagesBefore(new RecordEnvelope(mock4), workUnitState));
    }

    private DecodeableKafkaRecord getMock(String str, GenericRecord genericRecord) {
        DecodeableKafkaRecord decodeableKafkaRecord = (DecodeableKafkaRecord) Mockito.mock(DecodeableKafkaRecord.class);
        Mockito.when(decodeableKafkaRecord.getValue()).thenReturn(genericRecord);
        Mockito.when(decodeableKafkaRecord.getTopic()).thenReturn(str);
        Mockito.when(Integer.valueOf(decodeableKafkaRecord.getPartition())).thenReturn(1);
        return decodeableKafkaRecord;
    }

    private GenericRecord getRecord(Schema schema, String str) {
        GenericData.Record record = new GenericData.Record(schema);
        record.put("name", str);
        return record;
    }
}
