package io.reactivex.netty.protocol.http.server;

import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.reactivex.netty.channel.Connection;
import io.reactivex.netty.events.Clock;
import io.reactivex.netty.protocol.http.server.events.HttpServerEventPublisher;
import io.reactivex.netty.protocol.tcp.server.ConnectionHandler;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.Observable;
import rx.Subscriber;
import rx.functions.Func1;

/* loaded from: input_file:io/reactivex/netty/protocol/http/server/HttpConnectionHandler.class */
public class HttpConnectionHandler<I, O> implements ConnectionHandler<HttpServerRequest<I>, Object> {
    private static final Logger logger = LoggerFactory.getLogger(HttpConnectionHandler.class);
    private final RequestHandler<I, O> requestHandler;
    private final HttpServerEventPublisher eventPublisher;
    private final boolean sendHttp10ResponseFor10Request;

    public HttpConnectionHandler(RequestHandler<I, O> requestHandler, HttpServerEventPublisher httpServerEventPublisher, boolean z) {
        this.requestHandler = requestHandler;
        this.eventPublisher = httpServerEventPublisher;
        this.sendHttp10ResponseFor10Request = z;
    }

    @Override // io.reactivex.netty.protocol.tcp.server.ConnectionHandler
    public Observable<Void> handle(final Connection<HttpServerRequest<I>, Object> connection) {
        return connection.getInput().nest().concatMap(new Func1<Observable<HttpServerRequest<I>>, Observable<Void>>() { // from class: io.reactivex.netty.protocol.http.server.HttpConnectionHandler.1
            @Override // rx.functions.Func1
            public Observable<Void> call(Observable<HttpServerRequest<I>> observable) {
                return observable.take(1).flatMap(new Func1<HttpServerRequest<I>, Observable<Void>>() { // from class: io.reactivex.netty.protocol.http.server.HttpConnectionHandler.1.1
                    @Override // rx.functions.Func1
                    public Observable<Void> call(HttpServerRequest<I> httpServerRequest) {
                        long newStartTimeNanos = HttpConnectionHandler.this.eventPublisher.publishingEnabled() ? Clock.newStartTimeNanos() : -1L;
                        if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                            HttpConnectionHandler.this.eventPublisher.onNewRequestReceived();
                        }
                        return HttpConnectionHandler.this.handleRequest(httpServerRequest, newStartTimeNanos, HttpConnectionHandler.this.newResponse(httpServerRequest, connection), connection);
                    }
                });
            }
        }).repeat().ambWith(connection.closeListener());
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* JADX WARN: Multi-variable type inference failed */
    public Observable<Void> handleRequest(HttpServerRequest<I> httpServerRequest, final long j, final HttpServerResponse<O> httpServerResponse, final Connection<HttpServerRequest<I>, Object> connection) {
        Observable observable = null;
        try {
            if (httpServerRequest.decoderResult().isSuccess()) {
                observable = this.requestHandler.handle(httpServerRequest, httpServerResponse);
            }
            if (null == observable) {
                if (httpServerResponse.getStatus().equals(HttpResponseStatus.OK)) {
                    httpServerResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
                }
                observable = httpServerResponse.write(Observable.empty());
            }
        } catch (Throwable th) {
            logger.error("Unexpected error while invoking HTTP user handler.", th);
            observable = httpServerResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR).write(Observable.empty());
        }
        if (this.eventPublisher.publishingEnabled()) {
            observable = observable.lift(new Observable.Operator<Void, Void>() { // from class: io.reactivex.netty.protocol.http.server.HttpConnectionHandler.2
                @Override // rx.functions.Func1
                public Subscriber<? super Void> call(final Subscriber<? super Void> subscriber) {
                    if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                        HttpConnectionHandler.this.eventPublisher.onRequestHandlingStart(Clock.onEndNanos(j), TimeUnit.NANOSECONDS);
                    }
                    return new Subscriber<Void>(subscriber) { // from class: io.reactivex.netty.protocol.http.server.HttpConnectionHandler.2.1
                        @Override // rx.Observer
                        public void onCompleted() {
                            if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                                HttpConnectionHandler.this.eventPublisher.onRequestHandlingSuccess(Clock.onEndNanos(j), TimeUnit.NANOSECONDS);
                            }
                            subscriber.onCompleted();
                        }

                        @Override // rx.Observer
                        public void onError(Throwable th2) {
                            if (HttpConnectionHandler.this.eventPublisher.publishingEnabled()) {
                                HttpConnectionHandler.this.eventPublisher.onRequestHandlingFailed(Clock.onEndNanos(j), TimeUnit.NANOSECONDS, th2);
                            }
                            HttpConnectionHandler.logger.error("Unexpected error processing a request.", th2);
                            subscriber.onError(th2);
                        }

                        @Override // rx.Observer
                        public void onNext(Void r2) {
                        }
                    };
                }
            });
        }
        return observable.onErrorResumeNext(new Func1<Throwable, Observable<Void>>() { // from class: io.reactivex.netty.protocol.http.server.HttpConnectionHandler.3
            @Override // rx.functions.Func1
            public Observable<Void> call(Throwable th2) {
                HttpConnectionHandler.logger.error("Unexpected error while processing request.", th2);
                return httpServerResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR).dispose().concatWith(connection.close()).onErrorResumeNext(Observable.empty());
            }
        }).concatWith(httpServerRequest.dispose()).concatWith(httpServerResponse.dispose());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public HttpServerResponse<O> newResponse(HttpServerRequest<I> httpServerRequest, Connection<HttpServerRequest<I>, Object> connection) {
        DefaultHttpResponse defaultHttpResponse;
        HttpVersion httpVersion = this.sendHttp10ResponseFor10Request ? httpServerRequest.getHttpVersion() : HttpVersion.HTTP_1_1;
        if (httpServerRequest.decoderResult().isFailure()) {
            defaultHttpResponse = new DefaultHttpResponse(httpVersion, HttpResponseStatus.REQUEST_HEADER_FIELDS_TOO_LARGE);
            defaultHttpResponse.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE).set((CharSequence) HttpHeaderNames.CONTENT_LENGTH, (Object) 0);
        } else {
            defaultHttpResponse = new DefaultHttpResponse(httpVersion, HttpResponseStatus.OK);
        }
        HttpServerResponse<O> create = HttpServerResponseImpl.create(httpServerRequest, connection, defaultHttpResponse);
        setConnectionHeader(httpServerRequest, create);
        return create;
    }

    private void setConnectionHeader(HttpServerRequest<I> httpServerRequest, HttpServerResponse<O> httpServerResponse) {
        if (!httpServerRequest.isKeepAlive()) {
            httpServerResponse.setHeader(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
        } else {
            if (httpServerRequest.getHttpVersion().isKeepAliveDefault()) {
                return;
            }
            httpServerResponse.setHeader(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
        }
    }
}
