package org.apache.sqoop.connector.hdfs;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.util.LineReader;
import org.apache.log4j.Logger;
import org.apache.sqoop.common.SqoopException;
import org.apache.sqoop.connector.common.SqoopIDFUtils;
import org.apache.sqoop.connector.hdfs.configuration.FromJobConfiguration;
import org.apache.sqoop.connector.hdfs.configuration.LinkConfiguration;
import org.apache.sqoop.error.code.HdfsConnectorError;
import org.apache.sqoop.etl.io.DataWriter;
import org.apache.sqoop.job.etl.Extractor;
import org.apache.sqoop.job.etl.ExtractorContext;
import org.apache.sqoop.schema.Schema;

/* loaded from: input_file:WEB-INF/lib/sqoop-connector-hdfs-1.99.6.jar:org/apache/sqoop/connector/hdfs/HdfsExtractor.class */
public class HdfsExtractor extends Extractor<LinkConfiguration, FromJobConfiguration, HdfsPartition> {
    public static final Logger LOG = Logger.getLogger(HdfsExtractor.class);
    private DataWriter dataWriter;
    private Schema schema;
    private Configuration conf = new Configuration();
    private long rowsRead = 0;

    @Override // org.apache.sqoop.job.etl.Extractor
    public void extract(ExtractorContext extractorContext, LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, HdfsPartition hdfsPartition) {
        HdfsUtils.contextToConfiguration(extractorContext.getContext(), this.conf);
        this.dataWriter = extractorContext.getDataWriter();
        this.schema = extractorContext.getSchema();
        try {
            LOG.info("Working on partition: " + hdfsPartition);
            int numberOfFiles = hdfsPartition.getNumberOfFiles();
            for (int i = 0; i < numberOfFiles; i++) {
                extractFile(linkConfiguration, fromJobConfiguration, hdfsPartition.getFile(i), hdfsPartition.getOffset(i), hdfsPartition.getLength(i));
            }
        } catch (IOException e) {
            throw new SqoopException(HdfsConnectorError.GENERIC_HDFS_CONNECTOR_0001, e);
        }
    }

    private void extractFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Path path, long j, long j2) throws IOException {
        LOG.info("Extracting file " + path);
        LOG.info("\t from offset " + j);
        LOG.info("\t to offset " + (j + j2));
        LOG.info("\t of length " + j2);
        if (isSequenceFile(path)) {
            extractSequenceFile(linkConfiguration, fromJobConfiguration, path, j, j2);
        } else {
            extractTextFile(linkConfiguration, fromJobConfiguration, path, j, j2);
        }
    }

    private void extractSequenceFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Path path, long j, long j2) throws IOException {
        LOG.info("Extracting sequence file");
        long j3 = j + j2;
        SequenceFile.Reader reader = new SequenceFile.Reader(path.getFileSystem(this.conf), path, this.conf);
        if (j > reader.getPosition()) {
            reader.sync(j);
        }
        Text text = new Text();
        boolean next = reader.next(text);
        while (next) {
            this.rowsRead++;
            if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
                this.dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, SqoopIDFUtils.fromCSV(text.toString(), this.schema)));
            } else {
                this.dataWriter.writeStringRecord(text.toString());
            }
            text = new Text();
            next = reader.next(text);
            if (reader.getPosition() >= j3 && reader.syncSeen()) {
                break;
            }
        }
        reader.close();
    }

    private void extractTextFile(LinkConfiguration linkConfiguration, FromJobConfiguration fromJobConfiguration, Path path, long j, long j2) throws IOException {
        LineReader lineReader;
        Text text;
        int readLine;
        LOG.info("Extracting text file");
        long j3 = j + j2;
        Seekable open = path.getFileSystem(this.conf).open(path);
        CompressionCodec codec = new CompressionCodecFactory(this.conf).getCodec(path);
        Seekable seekable = open;
        if (codec == null) {
            open.seek(j);
            lineReader = new LineReader(open);
        } else {
            lineReader = new LineReader(codec.createInputStream(open, codec.createDecompressor()), this.conf);
            seekable = open;
        }
        if (j != 0) {
            j += lineReader.readLine(new Text(), 0);
        }
        LOG.info("Start position: " + String.valueOf(j));
        long j4 = j;
        while (j4 <= j3 && (readLine = lineReader.readLine((text = new Text()), Integer.MAX_VALUE)) != 0) {
            j4 = codec == null ? j4 + readLine : seekable.getPos();
            this.rowsRead++;
            if (HdfsUtils.hasCustomFormat(linkConfiguration, fromJobConfiguration)) {
                this.dataWriter.writeArrayRecord(HdfsUtils.formatRecord(linkConfiguration, fromJobConfiguration, SqoopIDFUtils.fromCSV(text.toString(), this.schema)));
            } else {
                this.dataWriter.writeStringRecord(text.toString());
            }
        }
        LOG.info("Extracting ended on position: " + seekable.getPos());
        open.close();
    }

    @Override // org.apache.sqoop.job.etl.Extractor
    public long getRowsRead() {
        return this.rowsRead;
    }

    private boolean isSequenceFile(Path path) {
        try {
            new SequenceFile.Reader(path.getFileSystem(this.conf), path, this.conf).close();
            return true;
        } catch (IOException e) {
            return false;
        }
    }
}
