package co.cask.cdap.etl.batch.source;

import co.cask.cdap.api.annotation.Description;
import co.cask.cdap.api.annotation.Name;
import co.cask.cdap.api.annotation.Plugin;
import co.cask.cdap.api.data.format.StructuredRecord;
import co.cask.cdap.api.dataset.lib.FileSetProperties;
import co.cask.cdap.api.dataset.lib.KeyValue;
import co.cask.cdap.etl.api.Emitter;
import co.cask.cdap.etl.common.AvroToStructuredTransformer;
import co.cask.cdap.etl.common.SnapshotFileSetConfig;
import javax.annotation.Nullable;
import org.apache.avro.Schema;
import org.apache.avro.SchemaParseException;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapreduce.AvroKeyOutputFormat;
import org.apache.hadoop.io.NullWritable;

@Name("SnapshotAvro")
@Description("Reads the most recent snapshot that was written to a SnapshotAvro sink.")
@Plugin(type = "batchsource")
/* loaded from: input_file:co/cask/cdap/etl/batch/source/SnapshotFileBatchAvroSource.class */
public class SnapshotFileBatchAvroSource extends SnapshotFileBatchSource<AvroKey<GenericRecord>, NullWritable> {
    private final AvroToStructuredTransformer recordTransformer;
    private final SnapshotAvroConfig config;

    /* loaded from: input_file:co/cask/cdap/etl/batch/source/SnapshotFileBatchAvroSource$SnapshotAvroConfig.class */
    public static class SnapshotAvroConfig extends SnapshotFileSetConfig {

        @Description("The Avro schema of the records to read.")
        private String schema;

        public SnapshotAvroConfig(String str, @Nullable String str2, String str3) {
            super(str, str2, null);
            this.schema = str3;
        }
    }

    public SnapshotFileBatchAvroSource(SnapshotAvroConfig snapshotAvroConfig) {
        super(snapshotAvroConfig);
        this.recordTransformer = new AvroToStructuredTransformer();
        this.config = snapshotAvroConfig;
    }

    public void transform(KeyValue<AvroKey<GenericRecord>, NullWritable> keyValue, Emitter<StructuredRecord> emitter) throws Exception {
        emitter.emit(this.recordTransformer.transform((GenericRecord) ((AvroKey) keyValue.getKey()).datum()));
    }

    @Override // co.cask.cdap.etl.batch.source.SnapshotFileBatchSource
    protected void addFileProperties(FileSetProperties.Builder builder) {
        try {
            new Schema.Parser().parse(this.config.schema);
            builder.setInputFormat(AvroKeyInputFormat.class).setOutputFormat(AvroKeyOutputFormat.class).setOutputProperty("avro.schema.output.key", this.config.schema).setEnableExploreOnCreate(true).setSerDe("org.apache.hadoop.hive.serde2.avro.AvroSerDe").setExploreInputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat").setExploreOutputFormat("org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat").setTableProperty("avro.schema.literal", this.config.schema);
        } catch (SchemaParseException e) {
            throw new IllegalArgumentException("Could not parse schema: " + e.getMessage(), e);
        }
    }

    public /* bridge */ /* synthetic */ void transform(Object obj, Emitter emitter) throws Exception {
        transform((KeyValue<AvroKey<GenericRecord>, NullWritable>) obj, (Emitter<StructuredRecord>) emitter);
    }
}
