/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ActorServerConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.encoding.Coding;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.net.TCPObjectSocket;

public class TCPServerConnector
implements ActorServerConnector {
    int port;
    protected ServerSocket acceptSocket;

    public static Promise<ActorServer> Publish(Actor facade, int port, Coding coding) {
        Promise<ActorServer> finished = new Promise<ActorServer>();
        try {
            ActorServer publisher = new ActorServer(new TCPServerConnector(port), facade, coding);
            facade.execute(() -> {
                try {
                    publisher.start();
                    finished.resolve(publisher);
                }
                catch (Exception e) {
                    finished.reject(e);
                }
            });
        }
        catch (Exception e) {
            e.printStackTrace();
            return new Promise<Object>(null, e);
        }
        return finished;
    }

    public TCPServerConnector(int port) {
        this.port = port;
    }

    @Override
    public void connect(Actor facade, Function<ObjectSocket, ObjectSink> factory) throws Exception {
        Promise p = new Promise();
        new Thread(() -> this.acceptLoop(facade, this.port, factory, p), "acceptor thread " + this.port).start();
        p.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Promise acceptLoop(Actor facade, int port, Function<ObjectSocket, ObjectSink> factory, Promise p) {
        try {
            this.acceptSocket = new ServerSocket(port);
            p.resolve();
            while (!this.acceptSocket.isClosed()) {
                Socket clientSocket = this.acceptSocket.accept();
                MyTCPSocket objectSocket = new MyTCPSocket(clientSocket);
                facade.execute(() -> {
                    ObjectSink sink = (ObjectSink)factory.apply(objectSocket);
                    new Thread(() -> {
                        while (!clientSocket.isClosed()) {
                            try {
                                Object o = objectSocket.readObject();
                                sink.receiveObject(o, null);
                            }
                            catch (Exception e) {
                                if (!(e instanceof EOFException) && !(e instanceof SocketException)) {
                                    Log.Warn((Object)this, e);
                                }
                                try {
                                    clientSocket.close();
                                }
                                catch (IOException e1) {
                                    Log.Warn((Object)this, e1);
                                }
                            }
                        }
                        sink.sinkClosed();
                    }, "tcp receiver").start();
                });
            }
        }
        catch (Exception e) {
            Log.Info((Object)this, e.getMessage());
            if (!p.isSettled()) {
                p.reject(e);
            }
        }
        finally {
            if (!p.isSettled()) {
                p.reject("conneciton failed");
            }
            try {
                this.acceptSocket.close();
            }
            catch (IOException e) {
                Log.Warn((Object)this, e);
            }
        }
        return p;
    }

    @Override
    public IPromise closeServer() {
        try {
            this.acceptSocket.close();
        }
        catch (IOException e) {
            return new Promise<Object>(null, e);
        }
        return new Promise<Object>(null);
    }

    static class MyTCPSocket
    extends TCPObjectSocket
    implements ObjectSocket {
        public MyTCPSocket(Socket socket) throws IOException {
            super(socket, null);
        }
    }
}

