package com.twitter.elephantbird.mapreduce.io;

import com.twitter.data.proto.BlockStorage;
import com.twitter.elephantbird.util.Protobufs;
import com.twitter.elephantbird.util.StreamSearcher;
import java.io.IOException;
import java.io.InputStream;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/twitter/elephantbird/mapreduce/io/BinaryBlockReader.class */
public abstract class BinaryBlockReader<M> {
    private static final Logger LOG = LoggerFactory.getLogger(BinaryBlockReader.class);
    private InputStream in_;
    private final StreamSearcher searcher_;
    private final BinaryConverter<M> protoConverter_;
    private BlockStorage.SerializedBlock curBlock_;
    private int numLeftToReadThisBlock_;
    private boolean readNewBlocks_;
    private boolean skipEmptyRecords;

    /* JADX INFO: Access modifiers changed from: protected */
    public BinaryBlockReader(InputStream inputStream, BinaryConverter<M> binaryConverter) {
        this(inputStream, binaryConverter, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public BinaryBlockReader(InputStream inputStream, BinaryConverter<M> binaryConverter, boolean z) {
        this.numLeftToReadThisBlock_ = 0;
        this.readNewBlocks_ = true;
        this.skipEmptyRecords = true;
        this.in_ = inputStream;
        this.protoConverter_ = binaryConverter;
        this.searcher_ = new StreamSearcher(Protobufs.KNOWN_GOOD_POSITION_MARKER);
        this.skipEmptyRecords = z;
    }

    public void close() throws IOException {
        if (this.in_ != null) {
            this.in_.close();
        }
    }

    public void setInputStream(InputStream inputStream) {
        this.in_ = inputStream;
    }

    public M readNext() throws IOException {
        byte[] readNextProtoBytes = readNextProtoBytes();
        if (readNextProtoBytes == null) {
            return null;
        }
        return this.protoConverter_.fromBytes(readNextProtoBytes);
    }

    public boolean readNext(BinaryWritable<M> binaryWritable) throws IOException {
        byte[] readNextProtoBytes = readNextProtoBytes();
        if (readNextProtoBytes == null) {
            return false;
        }
        binaryWritable.set(this.protoConverter_.fromBytes(readNextProtoBytes));
        return true;
    }

    public byte[] readNextProtoBytes() throws IOException {
        while (setupNewBlockIfNeeded()) {
            int protoBlobsCount = this.curBlock_.getProtoBlobsCount() - this.numLeftToReadThisBlock_;
            this.numLeftToReadThisBlock_--;
            byte[] byteArray = this.curBlock_.getProtoBlobs(protoBlobsCount).toByteArray();
            if (byteArray.length != 0 || !this.skipEmptyRecords) {
                return byteArray;
            }
        }
        return null;
    }

    public boolean readNextProtoBytes(BytesWritable bytesWritable) throws IOException {
        byte[] readNextProtoBytes = readNextProtoBytes();
        if (readNextProtoBytes == null) {
            return false;
        }
        bytesWritable.set(readNextProtoBytes, 0, readNextProtoBytes.length);
        return true;
    }

    public void markNoMoreNewBlocks() {
        this.readNewBlocks_ = false;
    }

    public boolean skipToNextSyncPoint() throws IOException {
        return this.searcher_.search(this.in_);
    }

    public BlockStorage.SerializedBlock parseNextBlock() throws IOException {
        LOG.debug("BlockReader: none left to read, skipping to sync point");
        if (!skipToNextSyncPoint()) {
            LOG.debug("BlockReader: SYNC point eof");
            return null;
        }
        int readInt = readInt();
        LOG.debug("BlockReader: found sync point, next block has size " + readInt);
        if (readInt < 0) {
            LOG.debug("ProtobufReader: reading size after sync point eof");
            return null;
        }
        byte[] bArr = new byte[readInt];
        IOUtils.readFully(this.in_, bArr, 0, readInt);
        BlockStorage.SerializedBlock parseFrom = BlockStorage.SerializedBlock.parseFrom(bArr);
        this.numLeftToReadThisBlock_ = parseFrom.getProtoBlobsCount();
        LOG.debug("ProtobufReader: number in next block is " + this.numLeftToReadThisBlock_);
        return parseFrom;
    }

    private boolean setupNewBlockIfNeeded() throws IOException {
        if (this.numLeftToReadThisBlock_ != 0) {
            return true;
        }
        if (!this.readNewBlocks_) {
            return false;
        }
        this.curBlock_ = parseNextBlock();
        return this.curBlock_ != null;
    }

    private int readInt() throws IOException {
        int read = this.in_.read();
        if (read == -1) {
            return -1;
        }
        return read | (this.in_.read() << 8) | (this.in_.read() << 16) | (this.in_.read() << 24);
    }
}
