package com.datatorrent.contrib.avro;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceStability.Evolving
/* loaded from: input_file:com/datatorrent/contrib/avro/AvroFileInputOperator.class */
public class AvroFileInputOperator extends AbstractFileInputOperator<GenericRecord> {
    private transient DataFileStream<GenericRecord> avroDataStream;
    private static final Logger LOG = LoggerFactory.getLogger(AvroFileInputOperator.class);
    private transient long offset = 0;

    @VisibleForTesting
    @AutoMetric
    int recordCount = 0;

    @VisibleForTesting
    @AutoMetric
    int errorCount = 0;
    public final transient DefaultOutputPort<GenericRecord> output = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<String> completedFilesPort = new DefaultOutputPort<>();
    public final transient DefaultOutputPort<String> errorRecordsPort = new DefaultOutputPort<>();

    protected InputStream openFile(Path path) throws IOException {
        InputStream openFile = super.openFile(path);
        if (openFile != null) {
            GenericDatumReader genericDatumReader = new GenericDatumReader();
            this.avroDataStream = new DataFileStream<>(openFile, genericDatumReader);
            genericDatumReader.setSchema(this.avroDataStream.getSchema());
        }
        return openFile;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* renamed from: readEntity, reason: merged with bridge method [inline-methods] */
    public GenericRecord m12readEntity() throws IOException {
        try {
            if (this.avroDataStream == null || !this.avroDataStream.hasNext()) {
                return null;
            }
            this.offset++;
            GenericRecord genericRecord = (GenericRecord) this.avroDataStream.next();
            this.recordCount++;
            return genericRecord;
        } catch (AvroRuntimeException e) {
            LOG.error("Exception in parsing record for file - " + ((AbstractFileInputOperator) this).currentFile + " at offset - " + this.offset, e);
            if (this.errorRecordsPort.isConnected()) {
                this.errorRecordsPort.emit("FileName:" + ((AbstractFileInputOperator) this).currentFile + ", Offset:" + this.offset);
            }
            this.errorCount++;
            throw new AvroRuntimeException(e);
        }
    }

    protected void closeFile(InputStream inputStream) throws IOException {
        String str = ((AbstractFileInputOperator) this).currentFile;
        if (this.avroDataStream != null) {
            this.avroDataStream.close();
        }
        super.closeFile(inputStream);
        if (this.completedFilesPort.isConnected()) {
            this.completedFilesPort.emit(str);
        }
        this.offset = 0L;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void emit(GenericRecord genericRecord) {
        if (genericRecord != null) {
            this.output.emit(genericRecord);
        }
    }

    public void beginWindow(long j) {
        this.errorCount = 0;
        this.recordCount = 0;
    }
}
