/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.tcp;

import java.io.EOFException;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.IPromise;
import org.nustaq.kontraktor.Promise;
import org.nustaq.kontraktor.remoting.base.ActorServer;
import org.nustaq.kontraktor.remoting.base.ActorServerConnector;
import org.nustaq.kontraktor.remoting.base.ObjectSink;
import org.nustaq.kontraktor.remoting.base.ObjectSocket;
import org.nustaq.kontraktor.remoting.encoding.Coding;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.net.TCPObjectSocket;

public class TCPServerConnector
implements ActorServerConnector {
    public static int DELAY_MS_TILL_CLOSE = 2000;
    public static AtomicInteger numberOfThreads = new AtomicInteger(0);
    int port;
    protected ServerSocket acceptSocket;
    protected ConcurrentLinkedQueue<Socket> clientSockets = new ConcurrentLinkedQueue();

    public static Promise<ActorServer> Publish(Actor facade, int port, Coding coding) {
        return TCPServerConnector.Publish(facade, port, coding, null);
    }

    public static Promise<ActorServer> Publish(Actor facade, int port, Coding coding, Consumer<Actor> disconnectCB) {
        Promise<ActorServer> finished = new Promise<ActorServer>();
        try {
            ActorServer publisher = new ActorServer(new TCPServerConnector(port), facade, coding);
            facade.execute(() -> {
                try {
                    publisher.start(disconnectCB);
                    finished.resolve(publisher);
                }
                catch (Exception e) {
                    finished.reject(e);
                }
            });
        }
        catch (Exception e) {
            e.printStackTrace();
            return new Promise<Object>(null, e);
        }
        return finished;
    }

    public TCPServerConnector(int port) {
        this.port = port;
    }

    @Override
    public void connect(Actor facade, Function<ObjectSocket, ObjectSink> factory) throws Exception {
        Promise p = new Promise();
        new Thread(() -> this.acceptLoop(facade, this.port, factory, p), "acceptor thread " + this.port).start();
        p.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected Promise acceptLoop(Actor facade, int port, Function<ObjectSocket, ObjectSink> factory, Promise p) {
        try {
            numberOfThreads.incrementAndGet();
            this.acceptSocket = new ServerSocket(port);
            p.resolve();
            while (!this.acceptSocket.isClosed()) {
                Socket clientSocket = this.acceptSocket.accept();
                this.clientSockets.add(clientSocket);
                MyTCPSocket objectSocket = new MyTCPSocket(clientSocket);
                facade.execute(() -> {
                    ObjectSink sink = (ObjectSink)factory.apply(objectSocket);
                    new Thread(() -> {
                        try {
                            numberOfThreads.incrementAndGet();
                            while (!clientSocket.isClosed()) {
                                try {
                                    Object o = objectSocket.readObject();
                                    sink.receiveObject(o, null);
                                }
                                catch (Exception e) {
                                    if (!(e instanceof EOFException) && !(e instanceof SocketException)) {
                                        Log.Warn((Object)this, e);
                                    }
                                    try {
                                        clientSocket.close();
                                    }
                                    catch (IOException e1) {
                                        Log.Warn((Object)this, e1);
                                    }
                                }
                            }
                            sink.sinkClosed();
                        }
                        finally {
                            this.clientSockets.remove(clientSocket);
                            numberOfThreads.decrementAndGet();
                        }
                    }, "tcp receiver").start();
                });
            }
        }
        catch (Exception e) {
            Log.Info((Object)this, e.getMessage());
            if (!p.isSettled()) {
                p.reject(e);
            }
        }
        finally {
            if (!p.isSettled()) {
                p.reject("conneciton failed");
            }
            try {
                this.acceptSocket.close();
            }
            catch (IOException e) {
                Log.Warn((Object)this, e);
            }
            numberOfThreads.decrementAndGet();
        }
        return p;
    }

    @Override
    public IPromise closeServer() {
        try {
            this.clientSockets.forEach(socket -> Actors.SubmitDelayed(DELAY_MS_TILL_CLOSE, () -> {
                try {
                    socket.close();
                }
                catch (IOException e) {
                    Log.Warn((Object)this, e);
                }
            }));
            this.acceptSocket.close();
        }
        catch (IOException e) {
            return new Promise<Object>(null, e);
        }
        return new Promise<Object>(null);
    }

    static class MyTCPSocket
    extends TCPObjectSocket
    implements ObjectSocket {
        public MyTCPSocket(Socket socket) throws IOException {
            super(socket, null);
        }
    }
}

