/*
 * Decompiled with CFR 0.152.
 */
package net.morimekta.providence.thrift.server;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import net.morimekta.providence.PProcessor;
import net.morimekta.providence.PServiceCall;
import net.morimekta.providence.mio.IOMessageReader;
import net.morimekta.providence.mio.IOMessageWriter;
import net.morimekta.providence.mio.MessageReader;
import net.morimekta.providence.mio.MessageWriter;
import net.morimekta.providence.serializer.BinarySerializer;
import net.morimekta.providence.serializer.Serializer;
import net.morimekta.providence.server.DefaultProcessorHandler;
import net.morimekta.providence.server.WrappedProcessor;
import net.morimekta.providence.util.ServiceCallInstrumentation;
import net.morimekta.util.concurrent.NamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SocketServer
implements AutoCloseable {
    private static final Logger LOGGER = LoggerFactory.getLogger(SocketServer.class);
    private final int clientTimeout;
    private final PProcessor processor;
    private final ServiceCallInstrumentation instrumentation;
    private final ServerSocket serverSocket;
    private final ExecutorService workerExecutor;
    private final Serializer serializer;

    public static Builder builder(@Nonnull PProcessor processor) {
        return new Builder(processor);
    }

    public int getPort() {
        if (this.workerExecutor.isShutdown()) {
            return -1;
        }
        return this.serverSocket.getLocalPort();
    }

    @Override
    public void close() {
        this.workerExecutor.shutdown();
        try {
            this.serverSocket.close();
        }
        catch (IOException e) {
            e.printStackTrace();
        }
        finally {
            try {
                this.workerExecutor.awaitTermination(10L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace();
            }
            this.workerExecutor.shutdownNow();
        }
    }

    private SocketServer(Builder builder) {
        try {
            this.clientTimeout = builder.clientTimeout;
            this.processor = builder.processor;
            this.instrumentation = builder.instrumentation != null ? builder.instrumentation : (duration, call, response) -> {};
            this.serializer = builder.serializer;
            this.serverSocket = new ServerSocket();
            this.serverSocket.setReuseAddress(true);
            this.serverSocket.bind(builder.bindAddress, builder.backlog);
            this.serverSocket.setSoTimeout(0);
            this.workerExecutor = Executors.newFixedThreadPool(builder.workerThreads, builder.workerThreadFactory);
            this.workerExecutor.submit(this::accept);
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private void accept() {
        try {
            Socket socket = this.serverSocket.accept();
            socket.setSoTimeout(this.clientTimeout);
            long startTime = System.nanoTime();
            this.workerExecutor.submit(() -> this.process(startTime, socket));
        }
        catch (SocketTimeoutException socket) {
        }
        catch (IOException e) {
            if (this.workerExecutor.isShutdown()) {
                return;
            }
            throw new UncheckedIOException(e);
        }
        catch (Exception e) {
            e.printStackTrace();
            throw new IllegalStateException(e);
        }
        this.workerExecutor.submit(this::accept);
    }

    private void process(long startTime, @Nonnull Socket socket) {
        try (Socket ignore = socket;
             BufferedInputStream in = new BufferedInputStream(socket.getInputStream());
             BufferedOutputStream out = new BufferedOutputStream(socket.getOutputStream());
             IOMessageReader reader = new IOMessageReader((InputStream)in, this.serializer);
             IOMessageWriter writer = new IOMessageWriter((OutputStream)out, this.serializer);){
            while (socket.isConnected()) {
                double duration;
                long endTime;
                AtomicReference callRef = new AtomicReference();
                AtomicReference responseRef = new AtomicReference();
                try {
                    DefaultProcessorHandler handler = new DefaultProcessorHandler((PProcessor)new WrappedProcessor(this.processor, (c, p) -> {
                        callRef.set(c);
                        responseRef.set(p.handleCall(c));
                        return (PServiceCall)responseRef.get();
                    }));
                    handler.process((MessageReader)reader, (MessageWriter)writer);
                    out.flush();
                    endTime = System.nanoTime();
                    duration = (double)(endTime - startTime) / (double)ServiceCallInstrumentation.NS_IN_MILLIS;
                    try {
                        this.instrumentation.onComplete(duration, (PServiceCall)callRef.get(), (PServiceCall)responseRef.get());
                    }
                    catch (Throwable th) {
                        LOGGER.error("Exception in service instrumentation", th);
                    }
                }
                catch (IOException e) {
                    endTime = System.nanoTime();
                    duration = (double)(endTime - startTime) / (double)ServiceCallInstrumentation.NS_IN_MILLIS;
                    try {
                        this.instrumentation.onTransportException((Exception)e, duration, (PServiceCall)callRef.get(), (PServiceCall)responseRef.get());
                    }
                    catch (Throwable th) {
                        LOGGER.error("Exception in service instrumentation", th);
                    }
                    throw new UncheckedIOException(e.getMessage(), e);
                }
                in.mark(1);
                if (in.read() < 0) {
                    return;
                }
                in.reset();
                startTime = System.nanoTime();
            }
        }
        catch (IOException e) {
            throw new UncheckedIOException(e.getMessage(), e);
        }
    }

    public static class Builder {
        private final PProcessor processor;
        private ServiceCallInstrumentation instrumentation;
        private InetSocketAddress bindAddress;
        private int clientTimeout = 60000;
        private int backlog = 50;
        private int workerThreads = 10;
        private ThreadFactory workerThreadFactory;
        private Serializer serializer;

        public Builder(@Nonnull PProcessor processor) {
            this.processor = processor;
            this.bindAddress = new InetSocketAddress(0);
            this.serializer = new BinarySerializer();
        }

        public Builder withPort(int port) {
            if (port < 0) {
                throw new IllegalArgumentException();
            }
            return this.withBindAddress(new InetSocketAddress(port));
        }

        public Builder withBindAddress(@Nonnull InetSocketAddress bindAddress) {
            this.bindAddress = bindAddress;
            return this;
        }

        public Builder withMaxBacklog(int maxBacklog) {
            if (maxBacklog < 0) {
                throw new IllegalArgumentException();
            }
            this.backlog = maxBacklog;
            return this;
        }

        public Builder withInstrumentation(@Nonnull ServiceCallInstrumentation instrumentation) {
            this.instrumentation = instrumentation;
            return this;
        }

        public Builder withClientTimeout(int timeoutInMs) {
            if (timeoutInMs < 1) {
                throw new IllegalArgumentException();
            }
            this.clientTimeout = timeoutInMs;
            return this;
        }

        public Builder withWorkerThreads(int numThreads) {
            if (numThreads < 1) {
                throw new IllegalArgumentException();
            }
            this.workerThreads = numThreads;
            return this;
        }

        public Builder withThreadFactory(ThreadFactory factory) {
            this.workerThreadFactory = factory;
            return this;
        }

        public Builder withSerializer(Serializer serializer) {
            this.serializer = serializer;
            return this;
        }

        public SocketServer start() {
            if (this.workerThreadFactory == null) {
                this.workerThreadFactory = NamedThreadFactory.builder().setNameFormat("providence-server-%d").setDaemon(false).build();
            }
            return new SocketServer(this);
        }
    }
}

