package org.apache.nemo.runtime.executor.bytetransfer;

import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import javax.annotation.Nullable;
import org.apache.nemo.runtime.executor.bytetransfer.ByteTransferContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/ByteInputContext.class */
public final class ByteInputContext extends ByteTransferContext {
    private static final Logger LOG = LoggerFactory.getLogger(ByteInputContext.class.getName());
    private final CompletableFuture<Iterator<InputStream>> completedFuture;
    private final ClosableBlockingQueue<ByteBufInputStream> byteBufInputStreams;
    private volatile ByteBufInputStream currentByteBufInputStream;
    private final Iterator<InputStream> inputStreams;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nemo/runtime/executor/bytetransfer/ByteInputContext$ByteBufInputStream.class */
    public static final class ByteBufInputStream extends InputStream {
        private final ClosableBlockingQueue<ByteBuf> byteBufQueue;

        private ByteBufInputStream() {
            this.byteBufQueue = new ClosableBlockingQueue<>();
        }

        @Override // java.io.InputStream
        public int read() throws IOException {
            try {
                ByteBuf peek = this.byteBufQueue.peek();
                if (peek == null) {
                    return -1;
                }
                short readUnsignedByte = peek.readUnsignedByte();
                if (peek.readableBytes() == 0) {
                    this.byteBufQueue.take();
                    peek.release();
                }
                return readUnsignedByte;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e);
            }
        }

        @Override // java.io.InputStream
        public int read(byte[] bArr, int i, int i2) throws IOException {
            if (bArr == null) {
                throw new NullPointerException();
            }
            if (i < 0 || i2 < 0 || i2 > bArr.length - i) {
                throw new IndexOutOfBoundsException();
            }
            int i3 = 0;
            int i4 = i2;
            while (i4 > 0) {
                try {
                    ByteBuf peek = this.byteBufQueue.peek();
                    if (peek == null) {
                        if (i3 == 0) {
                            return -1;
                        }
                        return i3;
                    }
                    int min = Math.min(peek.readableBytes(), i4);
                    peek.readBytes(bArr, i + i3, min);
                    if (peek.readableBytes() == 0) {
                        this.byteBufQueue.take();
                        peek.release();
                    }
                    i3 += min;
                    i4 -= min;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException(e);
                }
            }
            return i3;
        }

        @Override // java.io.InputStream
        public long skip(long j) throws IOException {
            if (j <= 0) {
                return 0L;
            }
            long j2 = 0;
            long j3 = j;
            while (j3 > 0) {
                try {
                    ByteBuf peek = this.byteBufQueue.peek();
                    if (peek == null) {
                        return j2;
                    }
                    if (peek.readableBytes() > j3) {
                        peek.skipBytes((int) j3);
                        return j2 + j3;
                    }
                    j2 += peek.readableBytes();
                    j3 -= peek.readableBytes();
                    this.byteBufQueue.take();
                    peek.release();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IOException(e);
                }
            }
            return j2;
        }

        @Override // java.io.InputStream
        public int available() throws IOException {
            try {
                ByteBuf peek = this.byteBufQueue.peek();
                if (peek == null) {
                    return 0;
                }
                return peek.readableBytes();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new IOException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ByteInputContext(String str, ByteTransferContext.ContextId contextId, byte[] bArr, ContextManager contextManager) {
        super(str, contextId, bArr, contextManager);
        this.completedFuture = new CompletableFuture<>();
        this.byteBufInputStreams = new ClosableBlockingQueue<>();
        this.currentByteBufInputStream = null;
        this.inputStreams = new Iterator<InputStream>() { // from class: org.apache.nemo.runtime.executor.bytetransfer.ByteInputContext.1
            @Override // java.util.Iterator
            public boolean hasNext() {
                try {
                    return ByteInputContext.this.byteBufInputStreams.peek() != null;
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e);
                }
            }

            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.Iterator
            public InputStream next() {
                try {
                    return (InputStream) ByteInputContext.this.byteBufInputStreams.take();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    ByteInputContext.LOG.error("Interrupted while taking byte buf.", e);
                    throw new NoSuchElementException();
                }
            }
        };
    }

    public Iterator<InputStream> getInputStreams() {
        return this.inputStreams;
    }

    public CompletableFuture<Iterator<InputStream>> getCompletedFuture() {
        return this.completedFuture;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onNewStream() {
        if (this.currentByteBufInputStream != null) {
            this.currentByteBufInputStream.byteBufQueue.close();
        }
        this.currentByteBufInputStream = new ByteBufInputStream();
        this.byteBufInputStreams.put(this.currentByteBufInputStream);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onByteBuf(ByteBuf byteBuf) {
        if (this.currentByteBufInputStream == null) {
            throw new RuntimeException("Cannot accept ByteBuf: No sub-stream is opened.");
        }
        if (byteBuf.readableBytes() > 0) {
            this.currentByteBufInputStream.byteBufQueue.put(byteBuf);
        } else {
            byteBuf.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onContextClose() {
        if (this.currentByteBufInputStream != null) {
            this.currentByteBufInputStream.byteBufQueue.close();
        }
        this.byteBufInputStreams.close();
        this.completedFuture.complete(this.inputStreams);
        deregister();
    }

    @Override // org.apache.nemo.runtime.executor.bytetransfer.ByteTransferContext
    public void onChannelError(@Nullable Throwable th) {
        setChannelError(th);
        if (this.currentByteBufInputStream != null) {
            this.currentByteBufInputStream.byteBufQueue.closeExceptionally(th);
        }
        this.byteBufInputStreams.closeExceptionally(th);
        this.completedFuture.completeExceptionally(th);
        deregister();
    }
}
