/*
 * Decompiled with CFR 0.152.
 */
package org.nustaq.kontraktor.remoting.http.undertow;

import io.undertow.server.HttpHandler;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import io.undertow.util.Headers;
import io.undertow.util.Methods;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.function.Consumer;
import org.nustaq.kontraktor.Actor;
import org.nustaq.kontraktor.Actors;
import org.nustaq.kontraktor.remoting.http.AbstractHttpServerConnector;
import org.nustaq.kontraktor.remoting.http.HttpObjectSocket;
import org.nustaq.kontraktor.remoting.http.KHttpExchange;
import org.nustaq.kontraktor.remoting.http.undertow.UndertowKHttpExchangeImpl;
import org.nustaq.kontraktor.util.Log;
import org.nustaq.kontraktor.util.Pair;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.channels.StreamSourceChannel;

public class UndertowHttpServerConnector
extends AbstractHttpServerConnector
implements HttpHandler {
    Consumer<HttpServerExchange> prepareResponse;

    public UndertowHttpServerConnector(Actor facade, Consumer<HttpServerExchange> prepareResponse) {
        super(facade);
        this.prepareResponse = prepareResponse;
    }

    public void handleRequest(HttpServerExchange exchange) throws Exception {
        if (exchange.getRequestMethod() != Methods.POST) {
            if (exchange.getRequestMethod() == Methods.OPTIONS) {
                if (this.prepareResponse != null) {
                    this.prepareResponse.accept(exchange);
                }
                exchange.setResponseCode(200);
                exchange.endExchange();
                return;
            }
            exchange.setResponseCode(404);
            exchange.endExchange();
            return;
        }
        String rpath = exchange.getRelativePath();
        StreamSourceChannel requestChannel = exchange.getRequestChannel();
        String first = exchange.getRequestHeaders().getFirst(Headers.CONTENT_LENGTH);
        int len = Integer.parseInt(first);
        ByteBuffer buf = ByteBuffer.allocate(len);
        requestChannel.getReadSetter().set(streamSourceChannel -> {
            try {
                streamSourceChannel.read(buf);
            }
            catch (IOException e) {
                Log.Warn((Object)this, (Throwable)e);
            }
            if (buf.remaining() == 0) {
                try {
                    requestChannel.shutdownReads();
                }
                catch (IOException e) {
                    Log.Warn((Object)this, (Throwable)e);
                }
                this.facade.execute(() -> this.requestReceived(exchange, buf.array(), rpath));
            }
        });
        requestChannel.resumeReads();
    }

    protected void requestReceived(HttpServerExchange exchange, byte[] postData, String path) {
        HeaderMap requestHeaders = exchange.getRequestHeaders();
        String sid = requestHeaders.getFirst("sid");
        String[] split = new String[]{};
        if (sid == null) {
            while (path.startsWith("/")) {
                path = path.substring(1);
            }
            if (path.trim().length() > 0) {
                split = path.split("/");
                sid = split[0];
            }
        }
        if (sid != null && sid.length() > 0) {
            HttpObjectSocket httpObjectSocket = (HttpObjectSocket)this.sessions.get(sid);
            if (httpObjectSocket != null) {
                this.handleClientRequest(exchange, httpObjectSocket, postData, split.length > 1 ? split[1] : null);
            } else {
                httpObjectSocket = this.restoreSessionFromId(sid);
                if (httpObjectSocket != null) {
                    this.handleClientRequest(exchange, httpObjectSocket, postData, split.length > 1 ? split[1] : null);
                } else {
                    exchange.setResponseCode(401);
                    exchange.endExchange();
                }
            }
        } else {
            Object auth = null;
            if (this.prepareResponse != null) {
                this.prepareResponse.accept(exchange);
            }
            this.handleNewSession(new UndertowKHttpExchangeImpl(exchange));
        }
    }

    public void handleClientRequest(HttpServerExchange exchange, HttpObjectSocket httpObjectSocket, byte[] postData, String lastSeenSequence) {
        byte[] msg;
        boolean isEmptyLP;
        StreamSinkChannel sinkchannel;
        if (this.prepareResponse != null) {
            this.prepareResponse.accept(exchange);
        }
        if ((sinkchannel = exchange.getResponseChannel()) == null) {
            Log.Error((Object)this, (String)"could not aquire response channel. rejecting request.");
            exchange.endExchange();
            return;
        }
        httpObjectSocket.updateTimeStamp();
        Object[] received = (Object[])httpObjectSocket.getConf().asObject(postData);
        boolean bl = isEmptyLP = received.length == 1 && received[0] instanceof Number;
        if (!isEmptyLP) {
            httpObjectSocket.updateLastRemoteCallTimeStamp();
            this.handleRegularRequest(exchange, httpObjectSocket, received, sinkchannel);
            return;
        }
        int lastClientSeq = -1;
        if (lastSeenSequence != null) {
            try {
                lastClientSeq = Integer.parseInt(lastSeenSequence);
            }
            catch (Throwable t) {
                Log.Warn((Object)this, (Throwable)t);
            }
        }
        if (lastClientSeq > 0 && (msg = (byte[])httpObjectSocket.takeStoredLPMessage(lastClientSeq + 1)) != null) {
            this.replyFromHistory(exchange, sinkchannel, msg);
            return;
        }
        sinkchannel.resumeWrites();
        Pair<Runnable, KHttpExchange> lpTask = this.createLongPollTask(new UndertowKHttpExchangeImpl(exchange), httpObjectSocket, sinkchannel);
        httpObjectSocket.cancelLongPoll();
        httpObjectSocket.setLongPollTask(lpTask);
    }

    protected Pair<Runnable, KHttpExchange> createLongPollTask(KHttpExchange exchange, HttpObjectSocket httpObjectSocket, StreamSinkChannel sinkchannel) {
        return new Pair(() -> {
            if (!sinkchannel.isOpen()) {
                return;
            }
            Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
            byte[] response = (byte[])nextQueuedMessage.getFirst();
            exchange.setResponseContentLength(response.length);
            if (response.length == 0) {
                exchange.endExchange();
            } else {
                httpObjectSocket.storeLPMessage((Integer)nextQueuedMessage.getSecond(), response);
                ByteBuffer responseBuf = ByteBuffer.wrap(response);
                try {
                    while (responseBuf.remaining() > 0) {
                        sinkchannel.write(responseBuf);
                    }
                }
                catch (Throwable e) {
                    System.out.println("buffer size:" + response.length);
                    try {
                        sinkchannel.close();
                    }
                    catch (IOException e1) {
                        e1.printStackTrace();
                    }
                    e.printStackTrace();
                }
                exchange.endExchange();
            }
        }, (Object)exchange);
    }

    protected void replyFromHistory(HttpServerExchange exchange, StreamSinkChannel sinkchannel, byte[] msg) {
        ByteBuffer responseBuf = ByteBuffer.wrap(msg);
        exchange.setResponseContentLength((long)msg.length);
        sinkchannel.getWriteSetter().set(channel -> {
            block5: {
                if (responseBuf.remaining() > 0) {
                    try {
                        sinkchannel.write(responseBuf);
                        if (responseBuf.remaining() == 0) {
                            exchange.endExchange();
                            break block5;
                        }
                        sinkchannel.resumeWrites();
                    }
                    catch (Exception e) {
                        e.printStackTrace();
                        exchange.endExchange();
                    }
                } else {
                    exchange.endExchange();
                }
            }
        });
        sinkchannel.resumeWrites();
    }

    protected void handleRegularRequest(HttpServerExchange exchange, HttpObjectSocket httpObjectSocket, Object[] received, StreamSinkChannel sinkchannel) {
        ArrayList futures = new ArrayList();
        httpObjectSocket.getSink().receiveObject((Object)received, futures, (Object)exchange.getRequestHeaders().getFirst("JWT"));
        Runnable reply = () -> {
            Pair<byte[], Integer> nextQueuedMessage = httpObjectSocket.getNextQueuedMessage();
            byte[] response = (byte[])nextQueuedMessage.getFirst();
            exchange.setResponseContentLength((long)response.length);
            if (response.length == 0) {
                exchange.endExchange();
            } else {
                httpObjectSocket.storeLPMessage((Integer)nextQueuedMessage.cdr(), response);
                long tim = System.nanoTime();
                ByteBuffer responseBuf = ByteBuffer.wrap(response);
                try {
                    while (responseBuf.remaining() > 0) {
                        sinkchannel.write(responseBuf);
                    }
                }
                catch (IOException e) {
                    Log.Warn((Object)this, (Throwable)e);
                }
                exchange.endExchange();
            }
        };
        if (futures == null || futures.size() == 0) {
            reply.run();
        } else {
            Actors.all(futures).timeoutIn((long)REQUEST_RESULTING_FUTURE_TIMEOUT).then(() -> reply.run()).onTimeout(() -> reply.run());
            sinkchannel.resumeWrites();
        }
    }
}

