/*
 * Decompiled with CFR 0.152.
 */
package net.trajano.ms.engine.internal;

import io.vertx.core.buffer.Buffer;
import io.vertx.core.streams.ReadStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import net.trajano.ms.engine.internal.Symbol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class VertxBlockingInputStream
extends InputStream {
    private static final Buffer END_BUFFER = Symbol.newSymbol(Buffer.class);
    private static final Buffer END_BUFFER_WITH_ERROR = Symbol.newSymbol(Buffer.class);
    private static final Logger LOG = LoggerFactory.getLogger(VertxBlockingInputStream.class);
    private int availableBytes = 0;
    private long bytesRead = 0L;
    private boolean closed = false;
    private Buffer currentBuffer;
    private IOException exceptionToThrow = null;
    private int pos;
    private final BlockingQueue<Buffer> queue = new LinkedBlockingQueue<Buffer>();

    public VertxBlockingInputStream() {
    }

    public VertxBlockingInputStream(ReadStream<Buffer> readStream) {
        readStream.handler(buffer -> this.populate((Buffer)buffer)).endHandler(aVoid -> this.end());
    }

    @Override
    public int available() throws IOException {
        return this.availableBytes;
    }

    @Override
    public void close() throws IOException {
        this.closed = true;
    }

    public void end() {
        this.queue.add(END_BUFFER);
    }

    public void error(Throwable e) {
        this.exceptionToThrow = new IOException(e);
        this.queue.add(END_BUFFER_WITH_ERROR);
    }

    public void populate(Buffer buffer) {
        this.queue.add(buffer);
        this.availableBytes += buffer.length();
    }

    @Override
    public int read() throws IOException {
        if (this.closed) {
            throw new IOException("Stream is closed");
        }
        if (this.currentBuffer == null) {
            try {
                this.currentBuffer = this.queue.take();
                this.pos = 0;
            }
            catch (InterruptedException e) {
                LOG.error("Interrupted while waiting for next buffer", e);
                Thread.currentThread().interrupt();
            }
        }
        if (this.currentBuffer == null) {
            throw new IOException("Obtained a null buffer from the queue");
        }
        if (this.currentBuffer == END_BUFFER_WITH_ERROR) {
            throw this.exceptionToThrow;
        }
        if (this.currentBuffer == END_BUFFER) {
            return -1;
        }
        byte b = this.currentBuffer.getByte(this.pos++);
        --this.availableBytes;
        ++this.bytesRead;
        if (this.pos == this.currentBuffer.length()) {
            this.currentBuffer = null;
        }
        return b;
    }

    public long totalBytesRead() {
        return this.bytesRead;
    }
}

