package ru.tinkoff.kora.http.server.undertow;

import io.undertow.io.IoCallback;
import io.undertow.io.Sender;
import io.undertow.server.DirectByteBufferDeallocator;
import io.undertow.server.HttpServerExchange;
import io.undertow.util.HeaderMap;
import io.undertow.util.Headers;
import io.undertow.util.HttpString;
import jakarta.annotation.Nullable;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xnio.IoUtils;
import org.xnio.XnioIoThread;
import ru.tinkoff.kora.common.Context;
import ru.tinkoff.kora.common.util.flow.LazySingleSubscription;
import ru.tinkoff.kora.common.util.flow.SingleSubscription;
import ru.tinkoff.kora.http.common.MutableHttpHeaders;
import ru.tinkoff.kora.http.common.body.HttpBodyOutput;
import ru.tinkoff.kora.http.common.header.HttpHeaders;
import ru.tinkoff.kora.http.server.common.HttpServer;
import ru.tinkoff.kora.http.server.common.HttpServerResponse;
import ru.tinkoff.kora.http.server.common.router.PublicApiHandler;
import ru.tinkoff.kora.http.server.common.router.PublicApiResponse;
import ru.tinkoff.kora.http.server.common.telemetry.HttpServerTracer;
import ru.tinkoff.kora.http.server.undertow.request.UndertowPublicApiRequest;

/* loaded from: input_file:ru/tinkoff/kora/http/server/undertow/UndertowExchangeProcessor.class */
public class UndertowExchangeProcessor implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(HttpServer.class);
    private static final Class<?> REACTOR_NON_BLOCKING;
    private static final Class<?> FAST_THREAD_LOCAL;
    private final HttpServerExchange exchange;
    private final PublicApiHandler publicApiHandler;
    private final Context context;

    @Nullable
    private final HttpServerTracer tracer;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:ru/tinkoff/kora/http/server/undertow/UndertowExchangeProcessor$HttpResponseBodySubscriber.class */
    public static class HttpResponseBodySubscriber implements Flow.Subscriber<ByteBuffer> {
        private final HttpServerExchange exchange;
        private final PublicApiResponse response;
        private final Throwable error;
        private volatile Flow.Subscription subscription;
        private final AtomicInteger state = new AtomicInteger(0);

        private HttpResponseBodySubscriber(HttpServerExchange httpServerExchange, PublicApiResponse publicApiResponse, @Nullable Throwable th) {
            this.exchange = httpServerExchange;
            this.response = publicApiResponse;
            this.error = th;
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onSubscribe(Flow.Subscription subscription) {
            this.subscription = subscription;
            subscription.request(1L);
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onNext(final ByteBuffer byteBuffer) {
            if ((this.state.incrementAndGet() & 16777216) != 0) {
                DirectByteBufferDeallocator.free(byteBuffer);
            } else if (!(this.subscription instanceof SingleSubscription) && !(this.subscription instanceof LazySingleSubscription)) {
                this.exchange.getResponseSender().send(byteBuffer, new IoCallback() { // from class: ru.tinkoff.kora.http.server.undertow.UndertowExchangeProcessor.HttpResponseBodySubscriber.2
                    public void onComplete(HttpServerExchange httpServerExchange, Sender sender) {
                        int decrementAndGet = HttpResponseBodySubscriber.this.state.decrementAndGet();
                        DirectByteBufferDeallocator.free(byteBuffer);
                        if ((decrementAndGet & 16777216) == 0) {
                            HttpResponseBodySubscriber.this.subscription.request(1L);
                        } else {
                            httpServerExchange.addExchangeCompleteListener((httpServerExchange2, nextListener) -> {
                                HttpResponseBodySubscriber.this.response.closeSendResponseSuccess(httpServerExchange2.getStatusCode(), (HttpHeaders) null, (Throwable) null);
                                nextListener.proceed();
                            });
                            httpServerExchange.endExchange();
                        }
                    }

                    public void onException(HttpServerExchange httpServerExchange, Sender sender, IOException iOException) {
                        DirectByteBufferDeallocator.free(byteBuffer);
                        HttpResponseBodySubscriber.this.subscription.cancel();
                        httpServerExchange.getResponseSender().close();
                        HttpResponseBodySubscriber.this.response.closeConnectionError(httpServerExchange.getStatusCode(), HttpResponseBodySubscriber.this.error == null ? iOException : HttpResponseBodySubscriber.this.error);
                    }
                });
            } else {
                this.exchange.setResponseContentLength(byteBuffer.remaining());
                this.exchange.getResponseSender().send(byteBuffer, new IoCallback() { // from class: ru.tinkoff.kora.http.server.undertow.UndertowExchangeProcessor.HttpResponseBodySubscriber.1
                    public void onComplete(HttpServerExchange httpServerExchange, Sender sender) {
                        httpServerExchange.addExchangeCompleteListener((httpServerExchange2, nextListener) -> {
                            HttpResponseBodySubscriber.this.response.closeSendResponseSuccess(httpServerExchange2.getStatusCode(), (HttpHeaders) null, HttpResponseBodySubscriber.this.error);
                            nextListener.proceed();
                        });
                        httpServerExchange.endExchange();
                    }

                    public void onException(HttpServerExchange httpServerExchange, Sender sender, IOException iOException) {
                        HttpResponseBodySubscriber.this.response.closeBodyError(httpServerExchange.getStatusCode(), iOException);
                    }
                });
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onError(Throwable th) {
            HttpServerExchange httpServerExchange = this.exchange;
            if (httpServerExchange.isResponseStarted()) {
                httpServerExchange.getResponseSender().close();
                this.response.closeBodyError(httpServerExchange.getStatusCode(), this.error == null ? th : this.error);
                httpServerExchange.endExchange();
            } else {
                httpServerExchange.setStatusCode(500);
                httpServerExchange.getResponseHeaders().remove(Headers.CONTENT_LENGTH);
                httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
                httpServerExchange.getResponseSender().send(th.getMessage());
                httpServerExchange.endExchange();
                this.response.closeBodyError(httpServerExchange.getStatusCode(), this.error == null ? th : this.error);
            }
        }

        @Override // java.util.concurrent.Flow.Subscriber
        public void onComplete() {
            if (!(this.subscription instanceof SingleSubscription) && this.state.updateAndGet(i -> {
                return i | 16777216;
            }) == 16777216) {
                this.exchange.addExchangeCompleteListener((httpServerExchange, nextListener) -> {
                    this.response.closeSendResponseSuccess(httpServerExchange.getStatusCode(), (HttpHeaders) null, this.error);
                    nextListener.proceed();
                });
                this.exchange.endExchange();
            }
        }
    }

    public UndertowExchangeProcessor(HttpServerExchange httpServerExchange, PublicApiHandler publicApiHandler, Context context, @Nullable HttpServerTracer httpServerTracer) {
        this.exchange = httpServerExchange;
        this.publicApiHandler = publicApiHandler;
        this.context = context;
        this.tracer = httpServerTracer;
    }

    @Override // java.lang.Runnable
    public void run() {
        HttpServerExchange httpServerExchange = this.exchange;
        Context context = this.context;
        UndertowContext.set(context, httpServerExchange);
        context.inject();
        try {
            try {
                PublicApiResponse process = this.publicApiHandler.process(context, new UndertowPublicApiRequest(httpServerExchange, context));
                if (!process.response().isDone()) {
                    process.response().whenComplete((httpServerResponse, th) -> {
                        if (httpServerResponse != null) {
                            sendResponse(httpServerExchange, process, httpServerResponse, null);
                            return;
                        }
                        if (th instanceof CompletionException) {
                            CompletionException completionException = (CompletionException) th;
                            if (completionException.getCause() != null) {
                                sendException(process, completionException.getCause());
                                return;
                            }
                        }
                        if (th != null) {
                            sendException(process, th);
                        } else {
                            sendResponse(httpServerExchange, process, HttpServerResponse.of(500), new RuntimeException("Illegal state: response future is empty"));
                        }
                    });
                    UndertowContext.clear(context);
                    return;
                }
                try {
                    HttpServerResponse httpServerResponse2 = (HttpServerResponse) process.response().join();
                    if (httpServerResponse2 == null) {
                        sendResponse(httpServerExchange, process, HttpServerResponse.of(500), new RuntimeException("Illegal state: response future is empty"));
                    } else {
                        sendResponse(httpServerExchange, process, httpServerResponse2, null);
                    }
                } catch (CompletionException e) {
                    sendException(process, (Throwable) Objects.requireNonNullElse(e.getCause(), e));
                } catch (Throwable th2) {
                    sendException(process, th2);
                }
                UndertowContext.clear(context);
            } catch (Throwable th3) {
                log.warn("Error dropped", th3);
                httpServerExchange.setStatusCode(500);
                httpServerExchange.getResponseSender().send(StandardCharsets.UTF_8.encode((String) Objects.requireNonNullElse(th3.getMessage(), "no message")));
                UndertowContext.clear(context);
            }
        } catch (Throwable th4) {
            UndertowContext.clear(context);
            throw th4;
        }
    }

    private void sendResponse(HttpServerExchange httpServerExchange, PublicApiResponse publicApiResponse, HttpServerResponse httpServerResponse, @Nullable Throwable th) {
        MutableHttpHeaders headers = httpServerResponse.headers();
        httpServerExchange.setStatusCode(httpServerResponse.code());
        HttpServerTracer httpServerTracer = this.tracer;
        if (httpServerTracer != null) {
            httpServerTracer.inject(this.context, httpServerExchange.getResponseHeaders(), (headerMap, str, str2) -> {
                headerMap.add(HttpString.tryFromString(str), str2);
            });
        }
        httpServerExchange.getResponseHeaders().put(Headers.SERVER, "kora/undertow");
        setHeaders(httpServerExchange.getResponseHeaders(), headers);
        HttpBodyOutput body = httpServerResponse.body();
        if (body == null) {
            httpServerExchange.addExchangeCompleteListener((httpServerExchange2, nextListener) -> {
                publicApiResponse.closeSendResponseSuccess(httpServerExchange2.getStatusCode(), httpServerResponse.headers(), th);
                nextListener.proceed();
            });
            httpServerExchange.endExchange();
            return;
        }
        String contentType = body.contentType();
        if (contentType != null) {
            httpServerExchange.getResponseHeaders().put(Headers.CONTENT_TYPE, contentType);
        }
        ByteBuffer fullContentIfAvailable = body.getFullContentIfAvailable();
        if (fullContentIfAvailable != null) {
            sendFullBody(publicApiResponse, httpServerResponse, fullContentIfAvailable, th);
            return;
        }
        int contentLength = body.contentLength();
        if (contentLength >= 0) {
            httpServerExchange.setResponseContentLength(contentLength);
        }
        if (!isInBlockingThread()) {
            sendStreamingBody(publicApiResponse, body, th);
            return;
        }
        if (!httpServerExchange.isBlocking()) {
            httpServerExchange.startBlocking();
        }
        try {
            OutputStream outputStream = httpServerExchange.getOutputStream();
            try {
                body.write(outputStream);
                if (outputStream != null) {
                    outputStream.close();
                }
                if (httpServerExchange.isComplete()) {
                    publicApiResponse.closeSendResponseSuccess(httpServerExchange.getStatusCode(), httpServerResponse.headers(), th);
                }
            } finally {
            }
        } catch (IOException e) {
            publicApiResponse.closeConnectionError(httpServerExchange.getStatusCode(), e);
            if (httpServerExchange.isResponseStarted()) {
                return;
            }
            httpServerExchange.setStatusCode(500);
            httpServerExchange.endExchange();
        } catch (Exception e2) {
            publicApiResponse.closeBodyError(httpServerExchange.getStatusCode(), e2);
            if (httpServerExchange.isResponseStarted()) {
                return;
            }
            httpServerExchange.setStatusCode(500);
            httpServerExchange.endExchange();
        }
    }

    private void setHeaders(HeaderMap headerMap, HttpHeaders httpHeaders) {
        Iterator it = httpHeaders.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            String str = (String) entry.getKey();
            if (!str.equals("server") && !str.equals("content-type") && !str.equals("content-length") && !str.equals("transfer-encoding")) {
                headerMap.addAll(HttpString.tryFromString(str), (Collection) entry.getValue());
            }
        }
    }

    private void sendFullBody(final PublicApiResponse publicApiResponse, HttpServerResponse httpServerResponse, @Nullable ByteBuffer byteBuffer, @Nullable final Throwable th) {
        HttpServerExchange httpServerExchange = this.exchange;
        final MutableHttpHeaders headers = httpServerResponse.headers();
        if (byteBuffer != null && byteBuffer.remaining() != 0) {
            httpServerExchange.setResponseContentLength(byteBuffer.remaining());
            httpServerExchange.getResponseSender().send(byteBuffer, new IoCallback() { // from class: ru.tinkoff.kora.http.server.undertow.UndertowExchangeProcessor.1
                public void onComplete(HttpServerExchange httpServerExchange2, Sender sender) {
                    sender.close(new IoCallback() { // from class: ru.tinkoff.kora.http.server.undertow.UndertowExchangeProcessor.1.1
                        public void onComplete(HttpServerExchange httpServerExchange3, Sender sender2) {
                            if (httpServerExchange3.isComplete()) {
                                publicApiResponse.closeSendResponseSuccess(httpServerExchange3.getStatusCode(), headers, th);
                                return;
                            }
                            PublicApiResponse publicApiResponse2 = publicApiResponse;
                            MutableHttpHeaders mutableHttpHeaders = headers;
                            Throwable th2 = th;
                            httpServerExchange3.addExchangeCompleteListener((httpServerExchange4, nextListener) -> {
                                publicApiResponse2.closeSendResponseSuccess(httpServerExchange4.getStatusCode(), mutableHttpHeaders, th2);
                                nextListener.proceed();
                            });
                            httpServerExchange3.endExchange();
                        }

                        public void onException(HttpServerExchange httpServerExchange3, Sender sender2, IOException iOException) {
                            try {
                                httpServerExchange3.endExchange();
                                publicApiResponse.closeConnectionError(httpServerExchange3.getStatusCode(), th == null ? th : iOException);
                            } catch (Throwable th2) {
                                publicApiResponse.closeConnectionError(httpServerExchange3.getStatusCode(), th == null ? th : iOException);
                                throw th2;
                            }
                        }
                    });
                }

                public void onException(HttpServerExchange httpServerExchange2, Sender sender, IOException iOException) {
                    try {
                        httpServerExchange2.endExchange();
                        IoUtils.safeClose(httpServerExchange2.getConnection());
                        publicApiResponse.closeConnectionError(httpServerExchange2.getStatusCode(), th == null ? th : iOException);
                    } catch (Throwable th2) {
                        IoUtils.safeClose(httpServerExchange2.getConnection());
                        publicApiResponse.closeConnectionError(httpServerExchange2.getStatusCode(), th == null ? th : iOException);
                        throw th2;
                    }
                }
            });
        } else {
            httpServerExchange.setResponseContentLength(0L);
            httpServerExchange.addExchangeCompleteListener((httpServerExchange2, nextListener) -> {
                publicApiResponse.closeSendResponseSuccess(httpServerExchange2.getStatusCode(), headers, th);
                nextListener.proceed();
            });
            httpServerExchange.endExchange();
        }
    }

    private void sendException(PublicApiResponse publicApiResponse, Throwable th) {
        if (!(th instanceof HttpServerResponse)) {
            this.exchange.setStatusCode(500);
            this.exchange.getResponseSender().send((String) Objects.requireNonNullElse(th.getMessage(), "Unknown error"));
            publicApiResponse.closeSendResponseSuccess(500, (HttpHeaders) null, th);
            return;
        }
        HttpServerResponse httpServerResponse = (HttpServerResponse) th;
        this.exchange.setStatusCode(httpServerResponse.code());
        setHeaders(this.exchange.getRequestHeaders(), httpServerResponse.headers());
        HttpBodyOutput body = httpServerResponse.body();
        if (body == null) {
            this.exchange.addExchangeCompleteListener((httpServerExchange, nextListener) -> {
                publicApiResponse.closeSendResponseSuccess(httpServerExchange.getStatusCode(), httpServerResponse.headers(), th);
                nextListener.proceed();
            });
            this.exchange.setResponseContentLength(0L);
            this.exchange.endExchange();
            return;
        }
        this.exchange.getResponseHeaders().put(Headers.CONTENT_TYPE, "text/plain");
        ByteBuffer fullContentIfAvailable = body.getFullContentIfAvailable();
        if (fullContentIfAvailable != null) {
            sendFullBody(publicApiResponse, httpServerResponse, fullContentIfAvailable, th);
            return;
        }
        if (!isInBlockingThread()) {
            sendStreamingBody(publicApiResponse, body, th);
            return;
        }
        try {
            OutputStream outputStream = this.exchange.startBlocking().getOutputStream();
            try {
                body.write(outputStream);
                if (outputStream != null) {
                    outputStream.close();
                }
            } finally {
            }
        } catch (IOException e) {
            if (this.exchange.isResponseStarted()) {
                try {
                    this.exchange.getConnection().close();
                } catch (IOException e2) {
                    e.addSuppressed(e2);
                }
            } else {
                this.exchange.setStatusCode(500);
            }
            publicApiResponse.closeBodyError(this.exchange.getStatusCode(), e);
        }
    }

    private boolean isInBlockingThread() {
        return !isInIoThread();
    }

    private boolean isInIoThread() {
        XnioIoThread currentThread = Thread.currentThread();
        if (this.exchange.getIoThread() == currentThread) {
            return true;
        }
        if (REACTOR_NON_BLOCKING == null || !REACTOR_NON_BLOCKING.isInstance(currentThread)) {
            return FAST_THREAD_LOCAL != null && FAST_THREAD_LOCAL.isInstance(currentThread);
        }
        return true;
    }

    private void sendStreamingBody(PublicApiResponse publicApiResponse, HttpBodyOutput httpBodyOutput, @Nullable Throwable th) {
        httpBodyOutput.subscribe(new HttpResponseBodySubscriber(this.exchange, publicApiResponse, th));
    }

    static {
        Class<?> cls = null;
        Class<?> cls2 = null;
        try {
            cls = Thread.currentThread().getContextClassLoader().loadClass("reactor.core.scheduler.NonBlocking");
        } catch (ClassNotFoundException e) {
        }
        try {
            cls2 = Thread.currentThread().getContextClassLoader().loadClass("io.netty.util.concurrent.FastThreadLocalThread");
        } catch (ClassNotFoundException e2) {
        }
        REACTOR_NON_BLOCKING = cls;
        FAST_THREAD_LOCAL = cls2;
    }
}
