package io.confluent.connect.hdfs.avro;

import io.confluent.connect.avro.AvroData;
import io.confluent.connect.hdfs.RecordWriter;
import io.confluent.connect.hdfs.RecordWriterProvider;
import io.confluent.kafka.serializers.NonRecordContainer;
import java.io.IOException;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/connect/hdfs/avro/AvroRecordWriterProvider.class */
public class AvroRecordWriterProvider implements RecordWriterProvider {
    private static final Logger log = LoggerFactory.getLogger(AvroRecordWriterProvider.class);
    private static final String EXTENSION = ".avro";

    @Override // io.confluent.connect.hdfs.RecordWriterProvider
    public String getExtension() {
        return EXTENSION;
    }

    @Override // io.confluent.connect.hdfs.RecordWriterProvider
    public RecordWriter<SinkRecord> getRecordWriter(Configuration configuration, String str, SinkRecord sinkRecord, final AvroData avroData) throws IOException {
        final DataFileWriter dataFileWriter = new DataFileWriter(new GenericDatumWriter());
        Path path = new Path(str);
        final Schema valueSchema = sinkRecord.valueSchema();
        dataFileWriter.create(avroData.fromConnectSchema(valueSchema), path.getFileSystem(configuration).create(path));
        return new RecordWriter<SinkRecord>() { // from class: io.confluent.connect.hdfs.avro.AvroRecordWriterProvider.1
            @Override // io.confluent.connect.hdfs.RecordWriter
            public void write(SinkRecord sinkRecord2) throws IOException {
                AvroRecordWriterProvider.log.trace("Sink record: {}", sinkRecord2.toString());
                Object fromConnectData = avroData.fromConnectData(valueSchema, sinkRecord2.value());
                if (fromConnectData instanceof NonRecordContainer) {
                    dataFileWriter.append(((NonRecordContainer) fromConnectData).getValue());
                } else {
                    dataFileWriter.append(fromConnectData);
                }
            }

            @Override // io.confluent.connect.hdfs.RecordWriter
            public void close() throws IOException {
                dataFileWriter.close();
            }
        };
    }
}
