package streams.hdfs;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.net.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import stream.Data;
import stream.data.DataFactory;
import streams.codec.Codec;
import streams.codec.DefaultCodec;
import streams.codec.Kryo;
import streams.io.BobCodec;

/* loaded from: input_file:streams/hdfs/BobStreamRecordReader.class */
public class BobStreamRecordReader implements RecordReader<Key, StreamRef> {
    static Logger log = LoggerFactory.getLogger(BobStreamRecordReader.class);
    final InputSplit split;
    final JobConf jobConf;
    long position;
    float totalSize;
    final Path path;
    final FSDataInputStream inputStream;
    final DefaultCodec<Data> codec = new DefaultCodec<>();
    int accesses = 0;
    final int numBlocks;
    final BobStreamReader reader;

    /* loaded from: input_file:streams/hdfs/BobStreamRecordReader$BobStreamReader.class */
    public static class BobStreamReader extends StreamRef {
        private static final long serialVersionUID = 1117728123120112585L;
        final String path;
        final MeteredInputStream min;
        final DataInputStream din;
        final Codec<Data> codec;
        Integer localBlocks;
        String id;
        Long totalBytes;

        public BobStreamReader(String str, int i, Map<String, List<String>> map, MeteredInputStream meteredInputStream) {
            super(i, map);
            this.codec = new Kryo();
            this.localBlocks = 0;
            this.totalBytes = 0L;
            this.path = str;
            this.min = meteredInputStream;
            this.din = new DataInputStream(meteredInputStream);
            this.id = str;
        }

        @Override // streams.hdfs.StreamRef
        public Long totalBytes() {
            return this.totalBytes;
        }

        public byte[] read(DataInputStream dataInputStream) throws Exception {
            byte[] readBlock = BobCodec.readBlock(dataInputStream);
            if (readBlock == null) {
                return null;
            }
            log.trace("Read {} bytes from input", Integer.valueOf(readBlock.length));
            return readBlock;
        }

        @Override // streams.hdfs.StreamRef
        public void init() throws Exception {
        }

        @Override // streams.hdfs.StreamRef
        public String getId() {
            return this.id;
        }

        @Override // streams.hdfs.StreamRef
        public void setId(String str) {
            this.id = str;
        }

        @Override // streams.hdfs.StreamRef
        public int blocks() {
            return this.localBlocks.intValue();
        }

        @Override // streams.hdfs.StreamRef
        public Long bytesRead() {
            return Long.valueOf(this.min.bytesRead());
        }

        @Override // streams.hdfs.StreamRef
        public Data read() throws Exception {
            log.debug("Reading block {} from {}", Integer.valueOf(this.blocks), this.path);
            byte[] read = read(this.din);
            if (read == null) {
                return null;
            }
            Data create = DataFactory.create();
            create.put("data:encoded", read);
            create.put("bytes:processed", Double.valueOf(bytesRead().doubleValue()));
            create.put("bytes:total", Double.valueOf(totalBytes().doubleValue()));
            return create;
        }

        @Override // streams.hdfs.StreamRef
        public void close() throws Exception {
            this.din.close();
        }
    }

    public BobStreamRecordReader(InputSplit inputSplit, JobConf jobConf) throws IOException {
        this.position = 0L;
        this.totalSize = -1.0f;
        log.debug("Creating BobStreamRecordReader for split {}", inputSplit);
        this.split = inputSplit;
        this.jobConf = jobConf;
        FileSplit fileSplit = (FileSplit) inputSplit;
        this.path = fileSplit.getPath();
        FileSystem fileSystem = this.path.getFileSystem(this.jobConf);
        this.inputStream = fileSystem.open(this.path);
        this.totalSize = ((float) fileSplit.getLength()) * 1.0f;
        this.position = fileSplit.getStart();
        log.debug("Starting to read BobRecords at {}", Long.valueOf(this.position));
        this.inputStream.seek(this.position);
        MeteredInputStream meteredInputStream = new MeteredInputStream(this.inputStream);
        this.numBlocks = fileSystem.getFileBlockLocations(fileSplit.getPath(), fileSplit.getStart(), fileSplit.getLength()).length;
        Map<String, List<String>> hostMap = BlockDistribution.getHostMap(fileSystem, this.path, fileSplit.getStart(), fileSplit.getLength());
        List<String> list = hostMap.get(NetUtils.getHostname());
        int size = list != null ? list.size() : 0;
        log.debug("Setting up BobStreamReader...");
        this.reader = new BobStreamReader(this.path.toString(), this.numBlocks, hostMap, meteredInputStream);
        this.reader.localBlocks = Integer.valueOf(size);
        this.reader.setId(this.path.toString());
        this.reader.totalBytes = Long.valueOf(fileSplit.getLength() - fileSplit.getStart());
    }

    public void close() throws IOException {
        this.inputStream.close();
    }

    /* renamed from: createKey, reason: merged with bridge method [inline-methods] */
    public Key m4createKey() {
        return new Key();
    }

    /* renamed from: createValue, reason: merged with bridge method [inline-methods] */
    public StreamRef m3createValue() {
        return this.reader;
    }

    public long getPos() throws IOException {
        return this.position;
    }

    public float getProgress() throws IOException {
        return Float.valueOf(1.0f * ((float) this.position)).floatValue() / this.totalSize;
    }

    public boolean next(Key key, StreamRef streamRef) throws IOException {
        if (this.accesses > 0) {
            return false;
        }
        this.accesses++;
        return true;
    }

    public static byte[] gunzip(byte[] bArr) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        GZIPInputStream gZIPInputStream = new GZIPInputStream(new ByteArrayInputStream(bArr));
        byte[] bArr2 = new byte[4096];
        int read = gZIPInputStream.read(bArr2);
        while (true) {
            int i = read;
            if (i <= 0) {
                gZIPInputStream.close();
                byteArrayOutputStream.close();
                return byteArrayOutputStream.toByteArray();
            }
            byteArrayOutputStream.write(bArr2, 0, i);
            read = gZIPInputStream.read(bArr2);
        }
    }
}
