package gobblin.source.extractor.extract.kafka;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import gobblin.configuration.WorkUnitState;
import gobblin.metrics.kafka.KafkaSchemaRegistry;
import gobblin.metrics.kafka.SchemaRegistryException;
import gobblin.source.extractor.DataRecordException;
import gobblin.util.AvroUtils;
import java.io.IOException;
import kafka.message.MessageAndOffset;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.class */
public abstract class KafkaAvroExtractor<K> extends KafkaExtractor<Schema, GenericRecord> {
    private static final Logger log = LoggerFactory.getLogger(KafkaAvroExtractor.class);
    protected static final Schema DEFAULT_SCHEMA = (Schema) SchemaBuilder.record("DefaultSchema").fields().name("header").type((Schema) SchemaBuilder.record("header").fields().name("time").type("long").withDefault(0).endRecord()).noDefault().endRecord();
    protected final Optional<KafkaSchemaRegistry<K, Schema>> schemaRegistry;
    protected final Optional<Schema> schema;
    protected final Optional<GenericDatumReader<GenericData.Record>> reader;

    public KafkaAvroExtractor(WorkUnitState workUnitState) {
        super(workUnitState);
        this.schemaRegistry = workUnitState.contains("kafka.schema.registry.class") ? Optional.of(KafkaSchemaRegistry.get(workUnitState.getProperties())) : Optional.absent();
        this.schema = getExtractorSchema();
        if (this.schema.isPresent()) {
            this.reader = Optional.of(new GenericDatumReader((Schema) this.schema.get()));
        } else {
            log.error(String.format("Cannot find latest schema for topic %s. This topic will be skipped", this.topicName));
            this.reader = Optional.absent();
        }
    }

    protected Optional<Schema> getExtractorSchema() {
        return Optional.fromNullable(getLatestSchemaByTopic(this.topicName));
    }

    protected Schema getLatestSchemaByTopic(String str) {
        Preconditions.checkState(this.schemaRegistry.isPresent());
        try {
            return (Schema) ((KafkaSchemaRegistry) this.schemaRegistry.get()).getLatestSchemaByTopic(str);
        } catch (SchemaRegistryException e) {
            log.error(String.format("Cannot find latest schema for topic %s. This topic will be skipped", str), e);
            return null;
        }
    }

    @Override // gobblin.source.extractor.extract.kafka.KafkaExtractor, gobblin.instrumented.extractor.InstrumentedExtractorBase
    public GenericRecord readRecordImpl(GenericRecord genericRecord) throws DataRecordException, IOException {
        if (this.schema.isPresent()) {
            return (GenericRecord) super.readRecordImpl((KafkaAvroExtractor<K>) genericRecord);
        }
        return null;
    }

    /* renamed from: getSchema, reason: merged with bridge method [inline-methods] */
    public Schema m64getSchema() {
        return (Schema) this.schema.or(DEFAULT_SCHEMA);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // gobblin.source.extractor.extract.kafka.KafkaExtractor
    public GenericRecord decodeRecord(MessageAndOffset messageAndOffset) throws IOException {
        byte[] bytes = getBytes(messageAndOffset.message().payload());
        Schema recordSchema = getRecordSchema(bytes);
        Decoder decoder = getDecoder(bytes);
        ((GenericDatumReader) this.reader.get()).setSchema(recordSchema);
        try {
            return AvroUtils.convertRecordSchema((GenericRecord) ((GenericDatumReader) this.reader.get()).read((Object) null, decoder), (Schema) this.schema.get());
        } catch (IOException e) {
            log.error(String.format("Error during decoding record for partition %s: ", getCurrentPartition()));
            throw e;
        }
    }

    protected abstract Schema getRecordSchema(byte[] bArr);

    protected abstract Decoder getDecoder(byte[] bArr);
}
