/*
 * Decompiled with CFR 0.152.
 */
package io.snappydata.thrift.server;

import com.gemstone.gemfire.CancelException;
import com.gemstone.gemfire.SystemFailure;
import com.gemstone.gemfire.internal.shared.InputStreamChannel;
import com.gemstone.gnu.trove.THashSet;
import com.pivotal.gemfirexd.NetworkInterface;
import com.pivotal.gemfirexd.internal.engine.Misc;
import io.snappydata.thrift.common.SnappyTSocket;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.spi.AbstractSelector;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolException;
import org.apache.thrift.server.ServerContext;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TServerEventHandler;
import org.apache.thrift.transport.TServerTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class SnappyThriftServerSelector
extends TServer {
    private final Logger LOGGER = LoggerFactory.getLogger((String)SnappyThriftServerSelector.class.getName());
    private final ExecutorService executorService;
    private final ThreadPoolExecutor threadPerConnExecutor;
    private final NetworkInterface.ConnectionListener connListener;
    private volatile boolean stopped;
    final int numSelectors;
    private final SelectorProcess[] selectorProcesses;
    private int currentSelectorIndex;
    private final TimeUnit stopTimeoutUnit;
    private final long stopTimeoutVal;
    private final AtomicInteger connectionCounter;
    final AtomicInteger numSelectorsInExecution;

    public SnappyThriftServerSelector(Args args) {
        super((TServer.AbstractServerArgs)args);
        args.validate();
        this.numSelectors = args.numSelectors;
        this.selectorProcesses = new SelectorProcess[this.numSelectors];
        this.stopTimeoutUnit = args.stopTimeoutUnit;
        this.stopTimeoutVal = args.stopTimeoutVal;
        this.executorService = args.executorService != null ? args.executorService : SnappyThriftServerSelector.createDefaultExecutorService(args);
        this.threadPerConnExecutor = args.threadPerConnExecutor != null ? args.threadPerConnExecutor : SnappyThriftServerSelector.createDefaultExecutorService(args);
        this.threadPerConnExecutor.setRejectedExecutionHandler(new RejectedExecutionHandler(){

            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                block2: {
                    try {
                        ClientProcessData data = ((ThreadWorker)r).data;
                        data.clientSocket.getSocketChannel().configureBlocking(false);
                        SnappyThriftServerSelector.this.registerClientDataInNextSelector(data);
                    }
                    catch (IOException ioe) {
                        if (SnappyThriftServerSelector.this.stopped) break block2;
                        SnappyThriftServerSelector.this.LOGGER.warn("Transport error occurred during acceptance of connection.", (Throwable)ioe);
                    }
                }
            }
        });
        this.connListener = args.connListener;
        this.connectionCounter = new AtomicInteger(0);
        this.numSelectorsInExecution = new AtomicInteger(0);
    }

    private static ThreadPoolExecutor createDefaultExecutorService(Args args) {
        SynchronousQueue<Runnable> executorQueue = new SynchronousQueue<Runnable>();
        return new ThreadPoolExecutor(args.minWorkerThreads, args.maxWorkerThreads, 60L, TimeUnit.SECONDS, executorQueue);
    }

    public void serve() {
        long newnow;
        try {
            this.serverTransport_.listen();
        }
        catch (TTransportException tte) {
            this.LOGGER.error("Failed to start listening on server socket!", (Throwable)tte);
            return;
        }
        if (this.eventHandler_ != null) {
            this.eventHandler_.preServe();
        }
        this.stopped = false;
        this.setServing(true);
        while (!this.stopped) {
            try {
                SnappyTSocket client = (SnappyTSocket)this.serverTransport_.accept();
                this.threadPerConnExecutor.execute(new ThreadWorker(this.newClientProcessData(client)));
            }
            catch (TTransportException tte) {
                if (this.stopped) continue;
                this.LOGGER.warn("Transport error occurred during accept of connection.", (Throwable)tte);
            }
        }
        for (SelectorProcess proc : this.selectorProcesses) {
            if (proc == null) continue;
            proc.stop();
        }
        this.executorService.shutdown();
        long now = System.currentTimeMillis();
        for (long timeoutMS = this.stopTimeoutUnit.toMillis(this.stopTimeoutVal); timeoutMS >= 0L; timeoutMS -= newnow - now) {
            try {
                this.executorService.awaitTermination(timeoutMS, TimeUnit.MILLISECONDS);
                break;
            }
            catch (InterruptedException ix) {
                newnow = System.currentTimeMillis();
                now = newnow;
                continue;
            }
        }
        this.setServing(false);
        this.serverTransport_.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private final void registerClientDataInNextSelector_(ClientProcessData clientData, SelectorProcess skipProc) throws IOException {
        while (true) {
            this.currentSelectorIndex = (this.currentSelectorIndex + 1) % this.numSelectors;
            SelectorProcess proc = this.selectorProcesses[this.currentSelectorIndex];
            if (skipProc != null && proc == skipProc) continue;
            if (proc == null || proc.stopped) {
                AbstractSelector selector = SelectorProvider.provider().openSelector();
                proc = new SelectorProcess(selector);
                proc.registerClient(clientData);
                this.selectorProcesses[this.currentSelectorIndex] = proc;
                this.executorService.execute(proc);
                return;
            }
            SelectorProcess selectorProcess = proc;
            synchronized (selectorProcess) {
                if (!proc.inExecution) {
                    proc.registerClient(clientData);
                    return;
                }
            }
        }
    }

    final synchronized void registerClientDataInNextSelector(ClientProcessData clientData) throws IOException {
        this.registerClientDataInNextSelector_(clientData, null);
    }

    final synchronized void registerClientDataInNextSelector(ArrayList<ClientProcessData> clientsData, SelectorProcess skipProc) throws IOException {
        for (ClientProcessData clientData : clientsData) {
            this.registerClientDataInNextSelector_(clientData, skipProc);
        }
    }

    public void stop() {
        this.stopped = true;
        this.serverTransport_.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final boolean handleRead(ClientProcessData data, boolean readFrameSize, boolean nonBlocking) {
        boolean success = false;
        try {
            if (data.connectionContext != null) {
                data.eventHandler.processContext(data.connectionContext, data.inputTransport, data.outputTransport);
            }
            InputStreamChannel input = data.clientSocket.getInputStream();
            while (!this.stopped) {
                if (readFrameSize) {
                    input.readInt();
                }
                if (!(success = data.processor.process(data.inputProtocol, data.outputProtocol))) break;
                if (input.available() <= 0) {
                    data.idle = true;
                    boolean bl = true;
                    return bl;
                }
                readFrameSize = true;
                success = false;
            }
            SnappyThriftServerSelector.cleanupSelectionKey(data);
        }
        catch (TTransportException tte) {
            SnappyThriftServerSelector.cleanupSelectionKey(data);
        }
        catch (TProtocolException tpe) {
            this.LOGGER.warn("Thrift protocol error occurred during processing of message. Closing this connection.", (Throwable)tpe);
            SnappyThriftServerSelector.cleanupSelectionKey(data);
        }
        catch (TException te) {
            this.LOGGER.error("Thrift error occurred during processing of message.", (Throwable)te);
            SnappyThriftServerSelector.cleanupSelectionKey(data);
        }
        catch (Exception e) {
            this.LOGGER.error("Error occurred during processing of message.", (Throwable)e);
            SnappyThriftServerSelector.cleanupSelectionKey(data);
        }
        finally {
            if (!success) {
                data.idle = true;
                if (!data.clientSocket.isOpen()) {
                    this.cleanupConnection(data);
                }
            }
        }
        return false;
    }

    protected void cleanupConnection(ClientProcessData data) {
        NetworkInterface.ConnectionListener listener;
        data.idle = false;
        if (data.eventHandler != null) {
            data.eventHandler.deleteContext(data.connectionContext, data.inputProtocol, data.outputProtocol);
        }
        if ((listener = this.connListener) != null) {
            listener.connectionClosed((TTransport)data.clientSocket, data.processor, data.connectionNumber);
        }
    }

    protected static void cleanupSelectionKey(ClientProcessData data) {
        data.close();
        SelectionKey key = data.key;
        if (key != null) {
            data.key = null;
            key.cancel();
        }
    }

    protected final void closeConnection(ClientProcessData data) {
        if (data != null) {
            SnappyThriftServerSelector.cleanupSelectionKey(data);
            this.cleanupConnection(data);
        }
    }

    protected ClientProcessData newClientProcessData(SnappyTSocket client) {
        TProcessor processor = this.processorFactory_.getProcessor((TTransport)client);
        TProtocol inputProtocol = this.inputProtocolFactory_.getProtocol((TTransport)client);
        TProtocol outputProtocol = this.outputProtocolFactory_.getProtocol((TTransport)client);
        int connectionNumber = this.connectionCounter.incrementAndGet();
        NetworkInterface.ConnectionListener listener = this.connListener;
        if (listener != null) {
            listener.connectionOpened((TTransport)client, processor, connectionNumber);
        }
        return new ClientProcessData(client, connectionNumber, processor, (TTransport)client, (TTransport)client, inputProtocol, outputProtocol, this.getEventHandler());
    }

    protected final class ThreadWorker
    implements Runnable {
        protected final ClientProcessData data;

        protected ThreadWorker(ClientProcessData data) {
            this.data = data;
        }

        @Override
        public void run() {
            while (SnappyThriftServerSelector.this.handleRead(this.data, true, false)) {
            }
        }
    }

    protected final class SelectorWorker
    implements Runnable {
        protected final ClientProcessData data;

        protected SelectorWorker(ClientProcessData data) {
            this.data = data;
        }

        @Override
        public void run() {
            SnappyThriftServerSelector.this.handleRead(this.data, false, true);
        }
    }

    protected final class SelectorProcess
    implements Runnable {
        private final Selector selector;
        private final ArrayList<ClientProcessData> pendingConnections;
        private final THashSet selectedKeys;
        private volatile boolean stopped;
        private volatile boolean inExecution;

        protected SelectorProcess(Selector selector) {
            this.selector = selector;
            this.pendingConnections = new ArrayList(4);
            this.selectedKeys = new THashSet(4);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            ExecutorService executorService = SnappyThriftServerSelector.this.executorService;
            THashSet selectedKeys = this.selectedKeys;
            try {
                while (!this.stopped) {
                    int numInExecution;
                    this.handlePendingConnections();
                    if (selectedKeys.isEmpty()) {
                        this.selector.select(1000L);
                        Set<SelectionKey> newSelectedKeys = this.selector.selectedKeys();
                        if (newSelectedKeys.size() <= 0) continue;
                        selectedKeys.addAll(newSelectedKeys);
                        newSelectedKeys.clear();
                    }
                    if (selectedKeys.isEmpty()) continue;
                    ClientProcessData myData = null;
                    Iterator keysIter = selectedKeys.iterator();
                    while (keysIter.hasNext()) {
                        SelectionKey key = (SelectionKey)keysIter.next();
                        keysIter.remove();
                        ClientProcessData data = (ClientProcessData)key.attachment();
                        if (key.isValid()) {
                            int readyOps = key.readyOps();
                            if (this.stopped) {
                                return;
                            }
                            if ((readyOps & 1) != 0) {
                                if (data.idle) {
                                    SnappyTSocket client = data.clientSocket;
                                    int remainingFrameSize = data.remainingFrameSize;
                                    try {
                                        remainingFrameSize = remainingFrameSize == 0 ? client.getInputStream().readFrame() : client.getInputStream().readFrameFragment(remainingFrameSize);
                                    }
                                    catch (IOException ioe) {
                                        if (client.isOpen()) {
                                            SnappyThriftServerSelector.this.LOGGER.trace("Got an IOException while reading frame", (Throwable)ioe);
                                        }
                                        SnappyThriftServerSelector.this.closeConnection(data);
                                        continue;
                                    }
                                    if (remainingFrameSize == 0) {
                                        if (myData == null) {
                                            myData = data;
                                            continue;
                                        }
                                        data.idle = false;
                                        data.remainingFrameSize = 0;
                                        executorService.execute(new SelectorWorker(data));
                                        continue;
                                    }
                                    data.remainingFrameSize = remainingFrameSize;
                                    continue;
                                }
                                Thread parkedThread = data.clientSocket.getInputStream().getParkedThread();
                                if (parkedThread != null) {
                                    LockSupport.unpark(parkedThread);
                                    continue;
                                }
                                if ((readyOps & 4) == 0 || data.idle || (parkedThread = data.clientSocket.getOutputStream().getParkedThread()) == null) continue;
                                LockSupport.unpark(parkedThread);
                                continue;
                            }
                            if ((readyOps & 4) != 0) {
                                Thread parkedThread;
                                if (data.idle || (parkedThread = data.clientSocket.getOutputStream().getParkedThread()) == null) continue;
                                LockSupport.unpark(parkedThread);
                                continue;
                            }
                            SnappyThriftServerSelector.this.LOGGER.warn("Unexpected state in select! " + key.interestOps());
                            continue;
                        }
                        SnappyThriftServerSelector.this.closeConnection(data);
                    }
                    if (myData == null) continue;
                    myData.idle = false;
                    myData.remainingFrameSize = 0;
                    boolean inlineExecute = false;
                    AtomicInteger numSelectorsInExecute = SnappyThriftServerSelector.this.numSelectorsInExecution;
                    int maxExecuteSelectors = SnappyThriftServerSelector.this.numSelectors - 2;
                    while ((numInExecution = numSelectorsInExecute.get()) < maxExecuteSelectors) {
                        if (!numSelectorsInExecute.compareAndSet(numInExecution, numInExecution + 1)) continue;
                        inlineExecute = true;
                        break;
                    }
                    if (inlineExecute) {
                        ArrayList<ClientProcessData> myClients = null;
                        try {
                            SelectorProcess selectorProcess = this;
                            synchronized (selectorProcess) {
                                int csize;
                                this.inExecution = true;
                                Set<SelectionKey> keys = this.selector.keys();
                                int ksize = keys.size();
                                if (ksize > 1) {
                                    myClients = new ArrayList<ClientProcessData>(ksize);
                                    for (SelectionKey key : keys) {
                                        if (!key.isValid()) continue;
                                        ClientProcessData data = (ClientProcessData)key.attachment();
                                        myClients.add(data);
                                        data.key = null;
                                        key.cancel();
                                    }
                                }
                                if ((csize = this.pendingConnections.size()) > 0) {
                                    if (myClients == null) {
                                        myClients = new ArrayList(csize);
                                    }
                                    for (int index = 0; index < csize; ++index) {
                                        myClients.add(this.pendingConnections.get(index));
                                    }
                                    this.pendingConnections.clear();
                                }
                            }
                            if (myClients != null) {
                                SnappyThriftServerSelector.this.registerClientDataInNextSelector(myClients, this);
                            }
                            SnappyThriftServerSelector.this.handleRead(myData, false, true);
                            continue;
                        }
                        finally {
                            this.inExecution = false;
                            numSelectorsInExecute.decrementAndGet();
                            continue;
                        }
                    }
                    executorService.execute(new SelectorWorker(myData));
                }
            }
            catch (ClosedChannelException cce) {
                Misc.checkIfCacheClosing(cce);
                this.stopped = true;
            }
            catch (ClosedSelectorException cse) {
                Misc.checkIfCacheClosing(cse);
                this.stopped = true;
            }
            catch (IOException ioe) {
                Misc.checkIfCacheClosing(ioe);
                this.stopped = true;
                SnappyThriftServerSelector.this.LOGGER.warn("Got an IOException while selecting!", (Throwable)ioe);
            }
            catch (Throwable t) {
                Error err;
                if (t instanceof Error && SystemFailure.isJVMFailureError((Error)(err = (Error)t))) {
                    SystemFailure.initiateFailure((Error)err);
                    throw err;
                }
                SystemFailure.checkFailure();
                Misc.checkIfCacheClosing(t);
                this.stopped = true;
                if (!(t instanceof CancelException)) {
                    SnappyThriftServerSelector.this.LOGGER.error("SelectorProcess.run() exiting due to uncaught error", t);
                }
            }
            finally {
                if (this.stopped) {
                    try {
                        for (SelectionKey selectionKey : this.selector.keys()) {
                            SnappyThriftServerSelector.this.closeConnection((ClientProcessData)selectionKey.attachment());
                        }
                        this.selector.close();
                    }
                    catch (ClosedSelectorException i$) {
                    }
                    catch (IOException ioe) {
                        SnappyThriftServerSelector.this.LOGGER.error("SelectorProcess.run() error in selector close", (Throwable)ioe);
                    }
                }
            }
        }

        synchronized void handlePendingConnections() throws IOException {
            int size = this.pendingConnections.size();
            if (size > 0) {
                ClientProcessData clientData = this.pendingConnections.get(0);
                this.selector.selectNow();
                Set<SelectionKey> newSelectedKeys = this.selector.selectedKeys();
                if (newSelectedKeys.size() > 0) {
                    this.selectedKeys.addAll(newSelectedKeys);
                    newSelectedKeys.clear();
                }
                this.addNewClient(clientData);
                for (int index = 1; index < size; ++index) {
                    clientData = this.pendingConnections.get(index);
                    this.addNewClient(clientData);
                }
                this.pendingConnections.clear();
            }
        }

        protected void addNewClient(ClientProcessData clientData) {
            try {
                SelectionKey clientKey;
                clientData.key = clientKey = clientData.clientSocket.registerSelector(this.selector, 5);
                clientKey.attach(clientData);
            }
            catch (ClosedChannelException cce) {
                SnappyThriftServerSelector.cleanupSelectionKey(clientData);
            }
            catch (IOException ioe) {
                SnappyThriftServerSelector.this.LOGGER.warn("Failed to register accepted connection to selector!", (Throwable)ioe);
                SnappyThriftServerSelector.cleanupSelectionKey(clientData);
            }
        }

        private boolean registerClient(ClientProcessData clientData) {
            if (!this.stopped) {
                this.pendingConnections.add(clientData);
                this.selector.wakeup();
                return true;
            }
            return false;
        }

        protected void stop() {
            this.stopped = true;
            this.selector.wakeup();
        }
    }

    protected static final class ClientProcessData {
        protected final SnappyTSocket clientSocket;
        protected final int connectionNumber;
        protected final TProcessor processor;
        protected final TTransport inputTransport;
        protected final TTransport outputTransport;
        protected final TProtocol inputProtocol;
        protected final TProtocol outputProtocol;
        protected final TServerEventHandler eventHandler;
        protected final ServerContext connectionContext;
        protected volatile SelectionKey key;
        protected volatile int remainingFrameSize;
        protected volatile boolean idle;

        protected ClientProcessData(SnappyTSocket socket, int connectionNumber, TProcessor proc, TTransport in, TTransport out, TProtocol inp, TProtocol outp, TServerEventHandler eventHandler) {
            this.clientSocket = socket;
            this.connectionNumber = connectionNumber;
            this.processor = proc;
            this.inputTransport = in;
            this.outputTransport = out;
            this.inputProtocol = inp;
            this.outputProtocol = outp;
            this.eventHandler = eventHandler;
            this.connectionContext = eventHandler != null ? eventHandler.createContext(inp, outp) : null;
            this.idle = true;
        }

        protected void close() {
            this.clientSocket.close();
        }
    }

    public static final class Args
    extends TServer.AbstractServerArgs<Args> {
        private NetworkInterface.ConnectionListener connListener;
        private int numSelectors = 8;
        private int minWorkerThreads = 8;
        private int maxWorkerThreads = Short.MAX_VALUE;
        private ExecutorService executorService;
        private ThreadPoolExecutor threadPerConnExecutor;
        private int stopTimeoutVal = 60;
        private TimeUnit stopTimeoutUnit = TimeUnit.SECONDS;

        public Args(TServerTransport transport) {
            super(transport);
        }

        public NetworkInterface.ConnectionListener getConnectionListener() {
            return this.connListener;
        }

        public Args setConnectionListener(NetworkInterface.ConnectionListener connListener) {
            this.connListener = connListener;
            return this;
        }

        public int getNumSelectors() {
            return this.numSelectors;
        }

        public Args setNumSelectors(int n) {
            this.numSelectors = n;
            return this;
        }

        public int getMinWorkerThreads() {
            return this.minWorkerThreads;
        }

        public Args setMinWorkerThreads(int n) {
            this.minWorkerThreads = n;
            return this;
        }

        public int getMaxWorkerThreads() {
            return this.maxWorkerThreads;
        }

        public Args setMaxWorkerThreads(int n) {
            this.maxWorkerThreads = n;
            return this;
        }

        public ExecutorService getExecutorService() {
            return this.executorService;
        }

        public Args setExecutorService(ExecutorService executorService) {
            this.executorService = executorService;
            return this;
        }

        public ExecutorService getThreadPerConnExecutor() {
            return this.threadPerConnExecutor;
        }

        public Args setThreadPerConnExecutor(ThreadPoolExecutor executorService) {
            this.threadPerConnExecutor = executorService;
            return this;
        }

        public int getStopTimeoutVal() {
            return this.stopTimeoutVal;
        }

        public TimeUnit getStopTimeoutUnit() {
            return this.stopTimeoutUnit;
        }

        public Args setStopTimeout(int timeout, TimeUnit timeoutUnit) {
            this.stopTimeoutVal = timeout;
            this.stopTimeoutUnit = timeoutUnit;
            return this;
        }

        public void validate() {
            if (this.numSelectors <= 0) {
                throw new IllegalArgumentException("numSelectors must be positive.");
            }
            if (this.minWorkerThreads < 0) {
                throw new IllegalArgumentException("minWorkerThreads must be non-negative.");
            }
            if (this.maxWorkerThreads <= 0) {
                throw new IllegalArgumentException("maxWorkerThreads must be positive.");
            }
        }
    }
}

