/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.runtime.library.common.sort.impl.dflt;

import java.io.IOException;
import java.io.InputStream;
import java.nio.IntBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.BoundedByteArrayOutputStream;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.WritableUtils;
import org.apache.tez.runtime.library.common.shuffle.impl.InMemoryWriter;
import org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter;
import org.apache.tez.runtime.library.common.sort.impl.dflt.InMemoryShuffleSorter;

public class SortBufferInputStream
extends InputStream {
    private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
    private final InMemoryShuffleSorter sorter;
    private InMemoryWriter sortOutput;
    private int mend;
    private int recIndex;
    private final byte[] kvbuffer;
    private final IntBuffer kvmeta;
    private final int partitionBytes;
    private final int partition;
    byte[] dualBuf = new byte[8192];
    DualBufferOutputStream out;
    private int readBytes = 0;
    byte[] one = new byte[1];

    public SortBufferInputStream(InMemoryShuffleSorter sorter, int partition) {
        this.sorter = sorter;
        this.partitionBytes = (int)sorter.getShuffleHeader(partition).getCompressedLength();
        this.partition = partition;
        this.mend = sorter.getMetaEnd();
        this.recIndex = sorter.getSpillIndex(partition);
        this.kvbuffer = sorter.kvbuffer;
        this.kvmeta = sorter.kvmeta;
        this.out = new DualBufferOutputStream(null, 0, 0, this.dualBuf);
        this.sortOutput = new InMemoryWriter(this.out);
    }

    public int read() throws IOException {
        int b = this.read(this.one, 0, 1);
        return b == -1 ? b : this.one[0];
    }

    public int read(byte[] b) throws IOException {
        return this.read(b, 0, b.length);
    }

    public int read(byte[] b, int off, int len) throws IOException {
        if (this.available() == 0) {
            return -1;
        }
        int currentOffset = off;
        int currentLength = len;
        int currentReadBytes = 0;
        int residualLen = this.out.getCurrent();
        if (residualLen > 0) {
            int readable = Math.min(currentLength, residualLen);
            System.arraycopy(this.dualBuf, 0, b, currentOffset, readable);
            currentOffset += readable;
            currentReadBytes += readable;
            this.out.setCurrentPointer(-readable);
            currentLength -= readable;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("XXX read_residual: readable=" + readable + " readBytes=" + this.readBytes));
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("XXX read: out.reset b=" + b + " currentOffset=" + currentOffset + " currentLength=" + currentLength + " recIndex=" + this.recIndex));
        }
        this.out.reset(b, currentOffset, currentLength);
        DataInputBuffer key = new DataInputBuffer();
        DefaultSorter.InMemValBytes value = this.sorter.createInMemValBytes();
        int kvPartition = 0;
        int numRec = 0;
        while (currentLength > 0 && this.recIndex < this.mend && (kvPartition = this.getKVPartition(this.recIndex)) == this.partition) {
            int kvoff = this.sorter.offsetFor(this.recIndex);
            int keyLen = this.kvmeta.get(kvoff + 1) - this.kvmeta.get(kvoff + 2);
            key.reset(this.kvbuffer, this.kvmeta.get(kvoff + 2), keyLen);
            int valLen = this.sorter.getVBytesForOffset(kvoff, value);
            int recLen = keyLen + WritableUtils.getVIntSize((long)keyLen) + (valLen + WritableUtils.getVIntSize((long)valLen));
            currentReadBytes += recLen;
            currentOffset += recLen;
            currentLength -= recLen;
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("XXX read: sortOutput.append #rec=" + ++numRec + " recIndex=" + this.recIndex + " kvoff=" + kvoff + " keyLen=" + keyLen + " valLen=" + valLen + " recLen=" + recLen + " readBytes=" + this.readBytes + " currentReadBytes=" + currentReadBytes + " currentLength=" + currentLength));
            }
            this.sortOutput.append(key, value);
            ++this.recIndex;
        }
        if (currentLength > 0 && (this.recIndex == this.mend || kvPartition != this.partition)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug((Object)("XXX About to call close: currentLength=" + currentLength + " recIndex=" + this.recIndex + " mend=" + this.mend + " kvPartition=" + kvPartition + " partitino=" + this.partition));
            }
            this.sortOutput.close();
            currentReadBytes += InMemoryShuffleSorter.IFILE_EOF_LENGTH + InMemoryShuffleSorter.IFILE_CHECKSUM_LENGTH;
        } else if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("XXX Hmm... currentLength=" + currentLength + " recIndex=" + this.recIndex + " mend=" + this.mend + " kvPartition=" + kvPartition + " partitino=" + this.partition));
        }
        int retVal = Math.min(currentReadBytes, len);
        this.readBytes += retVal;
        if (LOG.isDebugEnabled()) {
            LOG.debug((Object)("XXX read: done retVal=" + retVal + " currentReadBytes=" + currentReadBytes + " len=" + len + " readBytes=" + this.readBytes + " partitionBytes=" + this.partitionBytes + " residualBytes=" + this.out.getCurrent()));
        }
        return retVal;
    }

    private int getKVPartition(int recIndex) {
        return this.kvmeta.get(this.sorter.offsetFor(recIndex) + 3);
    }

    public int available() throws IOException {
        return this.partitionBytes - this.readBytes;
    }

    public void close() throws IOException {
        super.close();
    }

    public boolean markSupported() {
        return false;
    }

    static class DualBufferOutputStream
    extends BoundedByteArrayOutputStream {
        byte[] dualBuf;
        int currentPointer = 0;
        byte[] one = new byte[1];

        public DualBufferOutputStream(byte[] buf, int offset, int length, byte[] altBuf) {
            super(buf, offset, length);
            this.dualBuf = altBuf;
        }

        public void reset(byte[] b, int off, int len) {
            super.resetBuffer(b, off, len);
        }

        public void write(int b) throws IOException {
            this.one[0] = (byte)b;
            this.write(this.one, 0, 1);
        }

        public void write(byte[] b) throws IOException {
            this.write(b, 0, b.length);
        }

        public void write(byte[] b, int off, int len) throws IOException {
            int available = super.available();
            if (available >= len) {
                super.write(b, off, len);
            } else {
                super.write(b, off, available);
                System.arraycopy(b, off + available, this.dualBuf, this.currentPointer, len - available);
                this.currentPointer += len - available;
            }
        }

        int getCurrent() {
            return this.currentPointer;
        }

        void setCurrentPointer(int delta) {
            if (this.currentPointer + delta > this.dualBuf.length) {
                throw new IndexOutOfBoundsException("Trying to set dualBuf 'current' marker to " + (this.currentPointer + delta) + " when " + " dualBuf.length is " + this.dualBuf.length);
            }
            this.currentPointer = (this.currentPointer + delta) % this.dualBuf.length;
        }
    }
}

