/*
 * Decompiled with CFR 0.152.
 */
package net.morimekta.providence.thrift.server;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import net.morimekta.providence.PApplicationException;
import net.morimekta.providence.PApplicationExceptionType;
import net.morimekta.providence.PMessage;
import net.morimekta.providence.PProcessor;
import net.morimekta.providence.PServiceCall;
import net.morimekta.providence.PServiceCallType;
import net.morimekta.providence.serializer.BinarySerializer;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.thrift.io.FramedBufferOutputStream;
import net.morimekta.providence.util.ServiceCallInstrumentation;
import net.morimekta.util.io.BigEndianBinaryReader;
import net.morimekta.util.io.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NonblockingSocketServer
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(NonblockingSocketServer.class);
    private static final long NS_IN_MILLIS = ServiceCallInstrumentation.NS_IN_MILLIS;
    private final Selector selector;
    private final PProcessor processor;
    private final Serializer serializer;
    private final ServiceCallInstrumentation instrumentation;
    private final ServerSocketChannel serverSocketChannel;
    private final ServerSocket serverSocket;
    private final ExecutorService receiverExecutor;
    private final ExecutorService workerExecutor;
    private final int maxFrameSizeInBytes;

    public static Builder builder(@Nonnull PProcessor processor) {
        return new Builder(processor);
    }

    public int getPort() {
        if (this.receiverExecutor.isShutdown()) {
            return -1;
        }
        return this.serverSocket.getLocalPort();
    }

    @Override
    public void close() throws IOException {
        this.receiverExecutor.shutdown();
        this.workerExecutor.shutdown();
        try {
            this.serverSocket.close();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            try {
                this.workerExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            try {
                this.receiverExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.receiverExecutor.shutdownNow();
            this.workerExecutor.shutdownNow();
        }
    }

    private NonblockingSocketServer(Builder builder) {
        try {
            this.maxFrameSizeInBytes = builder.maxFrameSizeInBytes;
            this.serializer = builder.serializer;
            this.processor = builder.processor;
            this.instrumentation = builder.instrumentation != null ? builder.instrumentation : (duration, call, response) -> {};
            this.selector = Selector.open();
            this.serverSocketChannel = ServerSocketChannel.open();
            this.serverSocketChannel.configureBlocking(false);
            this.serverSocket = this.serverSocketChannel.socket();
            this.serverSocketChannel.socket().setSoTimeout(builder.readTimeoutInMs);
            this.serverSocket.setReuseAddress(true);
            this.serverSocket.bind(builder.bindAddress, builder.backlog);
            this.receiverExecutor = Executors.newSingleThreadExecutor(builder.receiverThreadFactory);
            this.workerExecutor = Executors.newFixedThreadPool(builder.workerThreads, builder.workerThreadFactory);
            this.serverSocketChannel.register(this.selector, 16);
            this.receiverExecutor.submit(this::selectLoop);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private synchronized void selectLoop() {
        while (this.serverSocketChannel.isOpen()) {
            try {
                this.selector.select();
                Iterator<SelectionKey> selectedKeys = this.selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    if (!key.isValid()) {
                        selectedKeys.remove();
                        continue;
                    }
                    if (key.isAcceptable()) {
                        this.accept();
                    } else if (key.isReadable()) {
                        this.handleRead(key, (Context)key.attachment());
                    } else if (key.isWritable()) {
                        this.handleWrite(key, (Context)key.attachment());
                    }
                    selectedKeys.remove();
                }
            }
            catch (IOException e) {
                LOGGER.error("Exception in thread: " + e.getMessage(), (Throwable)e);
            }
            for (SelectionKey cleanupKey : this.selector.keys()) {
                if (cleanupKey.channel() == this.serverSocketChannel) continue;
                SocketChannel channel = (SocketChannel)cleanupKey.channel();
                if (cleanupKey.isValid() && channel.isOpen() && !channel.socket().isClosed()) continue;
                try {
                    cleanupKey.channel().close();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
                cleanupKey.cancel();
            }
        }
    }

    private void accept() {
        try {
            SocketChannel socketChannel;
            while ((socketChannel = this.serverSocketChannel.accept()) != null) {
                socketChannel.configureBlocking(false);
                socketChannel.register(this.selector, 1, new Context(socketChannel, this.maxFrameSizeInBytes));
            }
        }
        catch (IOException e) {
            LOGGER.error("Exception when accepting: {}", (Object)e.getMessage(), (Object)e);
        }
    }

    private void handleRead(SelectionKey key, Context context) throws IOException {
        long startTime = System.nanoTime();
        if (context.currentFrameSize == 0) {
            try {
                if (context.channel.read(context.sizeBuffer) < 0) {
                    context.close();
                    key.cancel();
                    return;
                }
                if (context.sizeBuffer.position() < 4) {
                    return;
                }
            }
            catch (IOException e) {
                context.close();
                key.cancel();
                return;
            }
            context.sizeBuffer.flip();
            try (ByteBufferInputStream in = new ByteBufferInputStream(context.sizeBuffer);
                 BigEndianBinaryReader reader = new BigEndianBinaryReader((InputStream)in);){
                context.currentFrameSize = reader.expectInt();
            }
            context.sizeBuffer.rewind();
            if (context.currentFrameSize > this.maxFrameSizeInBytes) {
                LOGGER.warn("Attempting message of " + context.currentFrameSize + " > " + this.maxFrameSizeInBytes);
                context.close();
                key.cancel();
                return;
            }
            if (context.currentFrameSize < 1) {
                LOGGER.warn("Attempting message of " + context.currentFrameSize);
                context.close();
                key.cancel();
                return;
            }
            context.readBuffer.rewind();
            context.readBuffer.limit(context.currentFrameSize);
        }
        try {
            if (context.channel.read(context.readBuffer) < 0) {
                LOGGER.warn("Closed connection while reading frame");
                context.close();
                key.cancel();
                return;
            }
        }
        catch (IOException e) {
            LOGGER.warn("Exception reading frame: {}", (Object)e.getMessage(), (Object)e);
            context.close();
            key.cancel();
            return;
        }
        if (context.readBuffer.position() < context.readBuffer.limit()) {
            return;
        }
        try {
            context.currentFrameSize = 0;
            context.readBuffer.flip();
            PServiceCall call = this.serializer.deserialize((InputStream)new ByteBufferInputStream(context.readBuffer), this.processor.getDescriptor());
            context.readBuffer.clear();
            this.workerExecutor.submit(() -> {
                PServiceCall reply;
                try {
                    reply = this.processor.handleCall(call);
                }
                catch (Exception e) {
                    reply = new PServiceCall(call.getMethod(), PServiceCallType.EXCEPTION, call.getSequence(), (PMessage)PApplicationException.builder().setMessage(e.getMessage()).setId(PApplicationExceptionType.INTERNAL_ERROR).initCause((Throwable)e).build());
                }
                Object object = context.mutex;
                synchronized (object) {
                    context.writeQueue.offer(new WriteEntry(startTime, call, reply));
                    key.interestOps(key.interestOps() | 4);
                    this.selector.wakeup();
                }
            });
        }
        catch (IOException e) {
            double duration = ((double)System.nanoTime() - (double)startTime) / (double)NS_IN_MILLIS;
            this.instrumentation.onTransportException((Exception)e, duration, null, null);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private void handleWrite(SelectionKey key, Context context) {
        while (true) {
            WriteEntry entry;
            if ((entry = context.writeQueue.poll()) == null) {
                Object object = context.mutex;
                synchronized (object) {
                    if (!context.writeQueue.isEmpty()) return;
                    key.interestOps(key.interestOps() & 0xFFFFFFFB);
                    return;
                }
            }
            IOException ex = null;
            try {
                this.serializer.serialize((OutputStream)context.out, entry.reply);
                continue;
            }
            catch (IOException e) {
                ex = e;
                continue;
            }
            finally {
                try {
                    context.out.completeFrame();
                    context.out.flush();
                    continue;
                }
                catch (IOException e) {
                    LOGGER.error("Failed to write frame: {}", (Object)e.getMessage(), (Object)e);
                    context.close();
                    key.cancel();
                    continue;
                }
                finally {
                    double duration = ((double)System.nanoTime() - (double)entry.startTime) / (double)NS_IN_MILLIS;
                    if (ex == null) {
                        this.instrumentation.onComplete(duration, entry.call, entry.reply);
                        continue;
                    }
                    this.instrumentation.onTransportException((Exception)ex, duration, entry.call, entry.reply);
                    continue;
                }
            }
            break;
        }
    }

    private static class Context {
        final Object mutex = new Object();
        final SocketChannel channel;
        final Queue<WriteEntry> writeQueue;
        final FramedBufferOutputStream out;
        final ByteBuffer sizeBuffer;
        final ByteBuffer readBuffer;
        int currentFrameSize;

        private Context(SocketChannel channel, int maxFrameSizeInBytes) {
            this.channel = channel;
            this.currentFrameSize = 0;
            this.sizeBuffer = ByteBuffer.allocate(4);
            this.readBuffer = ByteBuffer.allocateDirect(maxFrameSizeInBytes);
            this.out = new FramedBufferOutputStream(channel, maxFrameSizeInBytes);
            this.writeQueue = new ConcurrentLinkedQueue<WriteEntry>();
        }

        void close() {
            try {
                this.channel.socket().close();
                this.channel.close();
            }
            catch (IOException e) {
                LOGGER.warn("Exception closing channel: {}", (Object)e.getMessage(), (Object)e);
            }
        }
    }

    private static class WriteEntry {
        long startTime;
        PServiceCall call;
        PServiceCall reply;

        WriteEntry(long startTime, PServiceCall call, PServiceCall reply) {
            this.startTime = startTime;
            this.call = call;
            this.reply = reply;
        }
    }

    public static class Builder {
        private final PProcessor processor;
        private ServiceCallInstrumentation instrumentation;
        private InetSocketAddress bindAddress;
        private int maxFrameSizeInBytes = 0xFA0000;
        private int readTimeoutInMs = 60000;
        private int backlog = 50;
        private int workerThreads = 10;
        private ThreadFactory receiverThreadFactory;
        private ThreadFactory workerThreadFactory;
        private Serializer serializer;

        public Builder(@Nonnull PProcessor processor) {
            this.processor = processor;
            this.bindAddress = new InetSocketAddress(0);
            this.receiverThreadFactory = this.workerThreadFactory = new ThreadFactoryBuilder().setNameFormat("providence-nonblocking-server-%d").setDaemon(true).build();
            this.serializer = new BinarySerializer();
        }

        public Builder withPort(int port) {
            if (port < 0) {
                throw new IllegalArgumentException();
            }
            return this.withBindAddress(new InetSocketAddress(port));
        }

        public Builder withBindAddress(@Nonnull InetSocketAddress bindAddress) {
            this.bindAddress = bindAddress;
            return this;
        }

        public Builder withMaxBacklog(int maxBacklog) {
            if (maxBacklog < 0) {
                throw new IllegalArgumentException();
            }
            this.backlog = maxBacklog;
            return this;
        }

        public Builder withMaxFrameSizeInBytes(int size) {
            if (size < 1024) {
                throw new IllegalArgumentException();
            }
            this.maxFrameSizeInBytes = size;
            return this;
        }

        public Builder withInstrumentation(@Nonnull ServiceCallInstrumentation instrumentation) {
            this.instrumentation = instrumentation;
            return this;
        }

        public Builder withReadTimeout(int timeoutInMs) {
            if (timeoutInMs < 1) {
                throw new IllegalArgumentException();
            }
            this.readTimeoutInMs = timeoutInMs;
            return this;
        }

        public Builder withWorkerThreads(int numThreads) {
            if (numThreads < 1) {
                throw new IllegalArgumentException();
            }
            this.workerThreads = numThreads;
            return this;
        }

        public Builder withWorkerThreadFactory(ThreadFactory factory) {
            this.workerThreadFactory = factory;
            return this;
        }

        public Builder withReceiverThreadFactory(ThreadFactory factory) {
            this.receiverThreadFactory = factory;
            return this;
        }

        public Builder withSerializer(Serializer serializer) {
            this.serializer = serializer;
            return this;
        }

        public NonblockingSocketServer start() {
            return new NonblockingSocketServer(this);
        }
    }
}

