package gobblin.source.extractor.extract.kafka;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.linkedin.data.schema.DataSchemaConstants;
import gobblin.configuration.WorkUnitState;
import gobblin.kafka.client.ByteArrayBasedKafkaRecord;
import gobblin.metrics.kafka.KafkaSchemaRegistry;
import gobblin.metrics.kafka.SchemaRegistryException;
import gobblin.source.extractor.DataRecordException;
import gobblin.util.AvroUtils;
import java.io.IOException;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/gobblin-kafka-common-0.11.0.jar:gobblin/source/extractor/extract/kafka/KafkaAvroExtractor.class */
public abstract class KafkaAvroExtractor<K> extends KafkaExtractor<Schema, GenericRecord> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) KafkaAvroExtractor.class);
    protected static final Schema DEFAULT_SCHEMA = SchemaBuilder.record("DefaultSchema").fields().name("header").type(SchemaBuilder.record("header").fields().name("time").type(DataSchemaConstants.LONG_TYPE).withDefault(0).endRecord()).noDefault().endRecord();
    protected final Optional<KafkaSchemaRegistry<K, Schema>> schemaRegistry;
    protected final Optional<Schema> schema;
    protected final Optional<GenericDatumReader<GenericData.Record>> reader;

    public KafkaAvroExtractor(WorkUnitState workUnitState) {
        super(workUnitState);
        this.schemaRegistry = workUnitState.contains(KafkaSchemaRegistry.KAFKA_SCHEMA_REGISTRY_CLASS) ? Optional.of(KafkaSchemaRegistry.get(workUnitState.getProperties())) : Optional.absent();
        this.schema = getExtractorSchema();
        if (this.schema.isPresent()) {
            this.reader = Optional.of(new GenericDatumReader(this.schema.get()));
        } else {
            log.error(String.format("Cannot find latest schema for topic %s. This topic will be skipped", this.topicName));
            this.reader = Optional.absent();
        }
    }

    protected Optional<Schema> getExtractorSchema() {
        return Optional.fromNullable(getLatestSchemaByTopic(this.topicName));
    }

    protected Schema getLatestSchemaByTopic(String str) {
        Preconditions.checkState(this.schemaRegistry.isPresent());
        try {
            return this.schemaRegistry.get().getLatestSchemaByTopic(str);
        } catch (SchemaRegistryException e) {
            log.error(String.format("Cannot find latest schema for topic %s. This topic will be skipped", str), (Throwable) e);
            return null;
        }
    }

    @Override // gobblin.source.extractor.extract.kafka.KafkaExtractor, gobblin.instrumented.extractor.InstrumentedExtractorBase
    public GenericRecord readRecordImpl(GenericRecord genericRecord) throws DataRecordException, IOException {
        if (this.schema.isPresent()) {
            return (GenericRecord) super.readRecordImpl((KafkaAvroExtractor<K>) genericRecord);
        }
        return null;
    }

    @Override // gobblin.source.extractor.Extractor
    public Schema getSchema() {
        return this.schema.or((Optional<Schema>) DEFAULT_SCHEMA);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Can't rename method to resolve collision */
    @Override // gobblin.source.extractor.extract.kafka.KafkaExtractor
    public GenericRecord decodeRecord(ByteArrayBasedKafkaRecord byteArrayBasedKafkaRecord) throws IOException {
        byte[] messageBytes = byteArrayBasedKafkaRecord.getMessageBytes();
        Schema recordSchema = getRecordSchema(messageBytes);
        Decoder decoder = getDecoder(messageBytes);
        this.reader.get().setSchema(recordSchema);
        try {
            return convertRecord((GenericRecord) this.reader.get().read(null, decoder));
        } catch (IOException e) {
            log.error(String.format("Error during decoding record for partition %s: ", getCurrentPartition()));
            throw e;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // gobblin.source.extractor.extract.kafka.KafkaExtractor
    public GenericRecord convertRecord(GenericRecord genericRecord) throws IOException {
        return AvroUtils.convertRecordSchema(genericRecord, this.schema.get());
    }

    protected abstract Schema getRecordSchema(byte[] bArr);

    protected abstract Decoder getDecoder(byte[] bArr);
}
