/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.asyncio;

import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.locks.LockSupport;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.asyncio.AsyncSocketConnection;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.offheap.BinaryQueue;
import org.nustaq.offheap.bytez.ByteSink;
import org.nustaq.offheap.bytez.ByteSource;
import org.nustaq.offheap.bytez.niobuffers.ByteBufferBasicBytez;
import org.nustaq.offheap.bytez.onheap.HeapBytez;

public abstract class QueuingAsyncSocketConnection
extends AsyncSocketConnection {
    public static long MAX_Q_SIZE_BYTES = 10000000L;
    protected BinaryQueue readQueue = new BinaryQueue();
    protected BinaryQueue writeQueue = new BinaryQueue();
    protected ByteBufferBasicBytez wrapper = new ByteBufferBasicBytez(null);
    ByteBufferBasicBytez tmp = new ByteBufferBasicBytez(null);
    HeapBytez tmpBA = new HeapBytez(new byte[0]);
    ByteBuffer qWriteTmp = ByteBuffer.allocateDirect(128000);

    public QueuingAsyncSocketConnection(SelectionKey key, SocketChannel chan) {
        super(key, chan);
    }

    protected void checkQSize() {
        if (this.writeQueue.available() > MAX_Q_SIZE_BYTES) {
            LockSupport.parkNanos(1L);
        }
    }

    public void write(ByteBuffer buf) {
        this.checkThread();
        this.checkQSize();
        this.tmp.setBuffer(buf);
        this.writeQueue.add((ByteSource)this.tmp);
    }

    public void write(byte[] b) {
        this.checkThread();
        this.checkQSize();
        this.write(b, 0, b.length);
    }

    public void write(byte[] b, int off, int len) {
        this.checkThread();
        this.checkQSize();
        this.tmpBA.setBase(b, (long)off, (long)len);
        this.writeQueue.add((ByteSource)this.tmpBA);
    }

    public void write(int val) {
        this.checkThread();
        this.checkQSize();
        this.writeQueue.addInt(val);
    }

    public void tryFlush() {
        this.checkThread();
        if (this.canWrite()) {
            this.qWriteTmp.position(0);
            this.qWriteTmp.limit(this.qWriteTmp.capacity());
            this.tmp.setBuffer(this.qWriteTmp);
            long poll = this.writeQueue.poll((ByteSink)this.tmp, 0L, this.tmp.length());
            if (poll > 0L) {
                this.qWriteTmp.limit((int)poll);
                IPromise queueDataAvailablePromise = this.directWrite(this.qWriteTmp);
                queueDataAvailablePromise.then((res, err) -> {
                    if (err != null) {
                        Log.Lg.error(this, (Throwable)err, "write failure");
                    } else {
                        this.tryFlush();
                    }
                });
            }
        }
    }

    @Override
    public void dataReceived(ByteBuffer buf) {
        this.wrapper.setBuffer(buf);
        this.readQueue.add((ByteSource)this.wrapper, (long)buf.position(), (long)buf.limit());
        this.dataReceived(this.readQueue);
    }

    protected abstract void dataReceived(BinaryQueue var1);

    @Override
    public void closed(Exception ioe) {
    }
}

