/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.asyncio;

import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.asyncio.QueuingAsyncSocketConnection;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.offheap.BinaryQueue;
import org.nustaq.serialization.FSTConfiguration;
import org.nustaq.serialization.util.FSTUtil;

public abstract class ObjectAsyncSocketConnection
extends QueuingAsyncSocketConnection
implements ObjectSocket {
    FSTConfiguration conf;
    Throwable lastError;
    ArrayList objects = new ArrayList();

    public ObjectAsyncSocketConnection(SelectionKey key, SocketChannel chan) {
        super(key, chan);
    }

    public ObjectAsyncSocketConnection(FSTConfiguration conf, SelectionKey key, SocketChannel chan) {
        super(key, chan);
        this.setConf(conf);
    }

    @Override
    public void setConf(FSTConfiguration conf) {
        this.conf = conf;
    }

    @Override
    public FSTConfiguration getConf() {
        return this.conf;
    }

    @Override
    public void dataReceived(BinaryQueue q) {
        this.checkThread();
        while (q.available() > 4L) {
            int len = q.readInt();
            if (len <= 0) {
                System.out.println("object len ?? " + len);
                return;
            }
            if (q.available() >= (long)len) {
                byte[] bytes = q.readByteArray(len);
                this.receivedObject(this.conf.asObject(bytes));
                continue;
            }
            q.back(4);
            break;
        }
    }

    public abstract void receivedObject(Object var1);

    @Override
    public void writeObject(Object o) {
        if (this.myActor != null) {
            this.myActor = Actor.current();
        }
        this.checkThread();
        this.objects.add(o);
        if (this.objects.size() > 100) {
            try {
                this.flush();
            }
            catch (Exception e) {
                FSTUtil.rethrow((Throwable)e);
            }
        }
    }

    @Override
    public void flush() throws IOException, Exception {
        if (this.theExecutingThread != Thread.currentThread()) {
            if (this.myActor == null) {
                return;
            }
            this.myActor.execute(() -> {
                try {
                    this.flush();
                }
                catch (Exception e) {
                    Log.Warn((Object)this, e);
                }
            });
            return;
        }
        this.checkThread();
        if (this.objects.size() == 0) {
            return;
        }
        this.objects.add(0);
        Object[] objArr = this.objects.toArray();
        this.objects.clear();
        byte[] bytes = this.conf.asByteArray((Object)objArr);
        this.write(bytes.length);
        this.write(bytes);
        this.tryFlush();
    }

    @Override
    public Throwable getLastError() {
        return this.lastError;
    }

    @Override
    public void setLastError(Throwable ex) {
        this.lastError = ex;
    }
}

