package gobblin.source.extractor.filebased;

import com.google.common.collect.Lists;
import com.google.common.io.Closer;
import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.WorkUnitState;
import gobblin.instrumented.extractor.InstrumentedExtractor;
import gobblin.metrics.Counters;
import gobblin.source.extractor.DataRecordException;
import gobblin.source.workunit.WorkUnit;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/source/extractor/filebased/FileBasedExtractor.class */
public abstract class FileBasedExtractor<S, D> extends InstrumentedExtractor<S, D> {
    private static final Logger LOG = LoggerFactory.getLogger(FileBasedExtractor.class);
    protected final WorkUnit workUnit;
    protected final WorkUnitState workUnitState;
    protected final SizeAwareFileBasedHelper fsHelper;
    protected final List<String> filesToPull;
    protected final Closer closer;
    private final int statusCount;
    private long totalRecordCount;
    private Iterator<D> currentFileItr;
    private String currentFile;
    private boolean hasNext;
    private final boolean shouldSkipFirstRecord;
    protected Counters<CounterNames> counters;

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:gobblin/source/extractor/filebased/FileBasedExtractor$CounterNames.class */
    public enum CounterNames {
        FileBytesRead
    }

    public FileBasedExtractor(WorkUnitState workUnitState, FileBasedHelper fileBasedHelper) {
        super(workUnitState);
        this.closer = Closer.create();
        this.totalRecordCount = 0L;
        this.hasNext = false;
        this.counters = new Counters<>();
        this.workUnitState = workUnitState;
        this.workUnit = workUnitState.getWorkunit();
        this.filesToPull = Lists.newArrayList(workUnitState.getPropAsList("source.filebased.files.to.pull", ""));
        this.statusCount = this.workUnit.getPropAsInt("filebased.report.status.on.count", 10000);
        this.shouldSkipFirstRecord = this.workUnitState.getPropAsBoolean("source.skip.first.record", false);
        if (fileBasedHelper instanceof SizeAwareFileBasedHelper) {
            this.fsHelper = (SizeAwareFileBasedHelper) fileBasedHelper;
        } else {
            this.fsHelper = new SizeAwareFileBasedHelperDecorator(fileBasedHelper);
        }
        try {
            this.fsHelper.connect();
            this.counters.initialize(getMetricContext(), CounterNames.class, getClass());
        } catch (FileBasedHelperException e) {
            throw new RuntimeException(e);
        }
    }

    @Override // gobblin.instrumented.extractor.InstrumentedExtractorBase
    public D readRecordImpl(@Deprecated D d) throws DataRecordException, IOException {
        this.totalRecordCount++;
        if (this.statusCount > 0 && this.totalRecordCount % this.statusCount == 0) {
            LOG.info("Total number of records processed so far: " + this.totalRecordCount);
        }
        if (this.currentFile == null || this.currentFileItr == null) {
            getNextFileToRead();
        } else {
            this.hasNext = this.currentFileItr.hasNext();
            if (!this.hasNext) {
                getNextFileToRead();
            }
        }
        if (this.hasNext) {
            return this.currentFileItr.next();
        }
        LOG.info("Finished reading records from all files");
        return null;
    }

    private void getNextFileToRead() throws IOException {
        if (this.currentFile != null && this.currentFileItr != null) {
            closeCurrentFile();
            incrementBytesReadCounter();
        }
        while (!this.hasNext && !this.filesToPull.isEmpty()) {
            this.currentFile = this.filesToPull.remove(0);
            this.currentFileItr = downloadFile(this.currentFile);
            this.hasNext = this.currentFileItr == null ? false : this.currentFileItr.hasNext();
            LOG.info("Will start downloading file: " + this.currentFile);
        }
    }

    public S getSchema() {
        return (S) this.workUnit.getProp("source.schema");
    }

    public long getExpectedRecordCount() {
        return -1L;
    }

    public long getHighWatermark() {
        LOG.info("High Watermark is -1 for file based extractors");
        return -1L;
    }

    public Iterator<D> downloadFile(String str) throws IOException {
        LOG.info("Beginning to download file: " + str);
        try {
            LineIterator lineIterator = IOUtils.lineIterator((InputStream) this.closer.register(this.fsHelper.getFileStream(str)), ConfigurationKeys.DEFAULT_CHARSET_ENCODING);
            if (this.shouldSkipFirstRecord && lineIterator.hasNext()) {
                lineIterator.next();
            }
            return lineIterator;
        } catch (FileBasedHelperException e) {
            throw new IOException("Exception while downloading file " + str + " with message " + e.getMessage(), e);
        }
    }

    public void closeCurrentFile() {
        try {
            this.closer.close();
        } catch (IOException e) {
            if (this.currentFile != null) {
                LOG.error("Failed to close file: " + this.currentFile, e);
            }
        }
    }

    @Override // gobblin.instrumented.extractor.InstrumentedExtractorBase, java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        try {
            this.fsHelper.close();
        } catch (FileBasedHelperException e) {
            LOG.error("Could not successfully close file system helper due to error: " + e.getMessage(), e);
        }
    }

    private void incrementBytesReadCounter() {
        try {
            this.counters.inc(CounterNames.FileBytesRead, this.fsHelper.getFileSize(this.currentFile));
        } catch (FileBasedHelperException e) {
            LOG.info("Unable to get file size. Will skip increment to bytes counter " + e.getMessage());
            LOG.debug(e.getMessage(), e);
        } catch (UnsupportedOperationException e2) {
            LOG.info("Unable to get file size. Will skip increment to bytes counter " + e2.getMessage());
            LOG.debug(e2.getMessage(), e2);
        }
    }
}
