package org.apache.tajo.storage.text;

import io.netty.buffer.ByteBuf;
import java.io.Closeable;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.tajo.exception.NotImplementedException;
import org.apache.tajo.exception.TajoRuntimeException;
import org.apache.tajo.exception.UnsupportedException;
import org.apache.tajo.storage.BufferPool;
import org.apache.tajo.storage.ByteBufInputChannel;
import org.apache.tajo.storage.FSDataInputChannel;
import org.apache.tajo.storage.FileScanner;
import org.apache.tajo.storage.LocalFileInputChannel;
import org.apache.tajo.storage.compress.CodecPool;
import org.apache.tajo.storage.fragment.FileFragment;

/* loaded from: input_file:org/apache/tajo/storage/text/DelimitedLineReader.class */
public class DelimitedLineReader implements Closeable {
    private static final Log LOG = LogFactory.getLog(DelimitedLineReader.class);
    private FileSystem fs;
    private FSDataInputStream fis;
    private InputStream is;
    private CompressionCodecFactory factory;
    private CompressionCodec codec;
    private Decompressor decompressor;
    private long startOffset;
    private long end;
    private long pos;
    private boolean eof;
    private ByteBufLineReader lineReader;
    private AtomicInteger lineReadBytes;
    private FileFragment fragment;
    private Configuration conf;
    private int bufferSize;

    public DelimitedLineReader(Configuration configuration, FileFragment fileFragment) throws IOException {
        this(configuration, fileFragment, 131072);
    }

    public DelimitedLineReader(Configuration configuration, FileFragment fileFragment, int i) throws IOException {
        this.eof = true;
        this.lineReadBytes = new AtomicInteger();
        this.fragment = fileFragment;
        this.conf = configuration;
        this.factory = new CompressionCodecFactory(configuration);
        this.codec = this.factory.getCodec(fileFragment.getPath());
        this.bufferSize = i;
        if (this.codec instanceof SplittableCompressionCodec) {
            throw new TajoRuntimeException(new NotImplementedException(getClass() + " does not support " + this.codec.getDefaultExtension()));
        }
    }

    public void init() throws IOException {
        if (this.is != null) {
            throw new IOException(getClass() + " was already initialized.");
        }
        if (this.fs == null) {
            this.fs = FileScanner.getFileSystem(this.conf, this.fragment.getPath());
        }
        long longValue = this.fragment.getStartKey().longValue();
        this.startOffset = longValue;
        this.pos = longValue;
        this.end = this.startOffset + this.fragment.getLength();
        if (this.codec != null) {
            this.fis = this.fs.open(this.fragment.getPath());
            this.decompressor = CodecPool.getDecompressor(this.codec);
            this.is = new DataInputStream(this.codec.createInputStream(this.fis, this.decompressor));
            this.lineReader = new ByteBufLineReader(new ByteBufInputChannel(this.is), BufferPool.directBuffer(this.bufferSize));
        } else if (this.fs instanceof LocalFileSystem) {
            try {
                FileInputStream fileInputStream = new FileInputStream(this.fragment.getPath().toUri().getScheme() != null ? new File(this.fragment.getPath().toUri()) : new File(this.fragment.getPath().toString()));
                fileInputStream.getChannel().position(this.startOffset);
                this.is = fileInputStream;
                this.lineReader = new ByteBufLineReader(new LocalFileInputChannel(fileInputStream), BufferPool.directBuffer((int) Math.min(this.bufferSize, this.end)));
            } catch (IllegalArgumentException e) {
                throw new IOException(e);
            }
        } else {
            this.fis = this.fs.open(this.fragment.getPath());
            this.fis.seek(this.startOffset);
            this.is = this.fis;
            this.lineReader = new ByteBufLineReader(new FSDataInputChannel(this.fis), BufferPool.directBuffer((int) Math.min(this.bufferSize, this.end)));
        }
        this.eof = false;
    }

    public void seek(long j) throws IOException {
        if (isCompressed()) {
            throw new TajoRuntimeException(new UnsupportedException());
        }
        this.lineReader.seek(j);
        this.pos = j;
        this.eof = false;
    }

    public long getCompressedPosition() throws IOException {
        return isCompressed() ? this.fis.getPos() : this.pos;
    }

    public long getUnCompressedPosition() throws IOException {
        return this.pos;
    }

    public long getReadBytes() {
        return this.pos - this.startOffset;
    }

    public boolean isReadable() {
        return !this.eof;
    }

    public ByteBuf readLine() throws IOException {
        if (this.eof) {
            return null;
        }
        ByteBuf readLineBuf = this.lineReader.readLineBuf(this.lineReadBytes);
        this.pos += this.lineReadBytes.get();
        if (readLineBuf == null) {
            this.eof = true;
        }
        if (!isCompressed() && getCompressedPosition() > this.end) {
            this.eof = true;
        }
        return readLineBuf;
    }

    public boolean isCompressed() {
        return this.codec != null;
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        try {
            IOUtils.cleanup(LOG, new Closeable[]{this.lineReader});
            this.fs = null;
            this.is = null;
            this.fis = null;
            this.lineReader = null;
            if (this.decompressor != null) {
                CodecPool.returnDecompressor(this.decompressor);
                this.decompressor = null;
            }
        } catch (Throwable th) {
            if (this.decompressor != null) {
                CodecPool.returnDecompressor(this.decompressor);
                this.decompressor = null;
            }
            throw th;
        }
    }
}
