/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.Callback;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorClientConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.net.TCPObjectSocket;
import org.nustaq.serialization.util.FSTUtil;

public class TCPClientConnector
implements ActorClientConnector {
    protected static AtomicReference<RemotingHelper> singleton = new AtomicReference();
    protected int port;
    protected String host;
    protected MyTCPSocket socket;
    protected Callback<ActorClientConnector> disconnectCallback;

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected static RemotingHelper get() {
        AtomicReference<RemotingHelper> atomicReference = singleton;
        synchronized (atomicReference) {
            if (singleton.get() == null) {
                singleton.set(Actors.AsActor(RemotingHelper.class));
            }
            return singleton.get();
        }
    }

    public TCPClientConnector(int port, String host, Callback<ActorClientConnector> disconnectCallback) {
        this.port = port;
        this.host = host;
        this.disconnectCallback = disconnectCallback;
    }

    @Override
    public IPromise connect(Function<ObjectSocket, ObjectSink> factory) throws Exception {
        Promise res = new Promise();
        this.socket = new MyTCPSocket(this.host, this.port);
        ObjectSink sink = factory.apply(this.socket);
        new Thread(() -> {
            res.resolve();
            while (!this.socket.isClosed()) {
                try {
                    Object o = this.socket.readObject();
                    sink.receiveObject(o, null);
                }
                catch (Exception e) {
                    if (!(e instanceof EOFException) && !(e instanceof SocketException)) {
                        Log.Warn((Object)this, e);
                    } else {
                        Log.Warn((Object)this, e.getMessage());
                    }
                    try {
                        this.socket.close();
                    }
                    catch (IOException e1) {
                        Log.Warn((Object)this, e.getMessage());
                    }
                }
            }
            if (this.disconnectCallback != null) {
                this.disconnectCallback.complete(this, null);
            }
            sink.sinkClosed();
        }, "tcp client receiver").start();
        return res;
    }

    @Override
    public IPromise closeClient() {
        try {
            this.socket.close();
        }
        catch (IOException e) {
            e.printStackTrace();
            return new Promise<IOException>(e);
        }
        return new Promise();
    }

    static class MyTCPSocket
    extends TCPObjectSocket
    implements ObjectSocket {
        static AtomicInteger idCount = new AtomicInteger(0);
        int id = idCount.incrementAndGet();
        ArrayList objects = new ArrayList();

        public MyTCPSocket(String host, int port) throws IOException {
            super(host, port);
            this.getSocket().setKeepAlive(true);
        }

        @Override
        public void writeObject(Object toWrite) throws Exception {
            this.objects.add(toWrite);
            if (this.objects.size() > 100) {
                this.flush();
            }
        }

        @Override
        public void flush() throws IOException {
            if (this.objects.size() == 0) {
                return;
            }
            this.objects.add(0);
            Object[] objArr = this.objects.toArray();
            this.objects.clear();
            try {
                super.writeObject((Object)objArr);
            }
            catch (Exception e) {
                FSTUtil.rethrow((Throwable)e);
            }
            super.flush();
        }

        @Override
        public int getId() {
            return this.id;
        }
    }

    public static class RemotingHelper
    extends Actor<RemotingHelper> {
    }
}

