/*
 * Decompiled with CFR 0.152.
 */
package io.reactivex.netty.protocol.http.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufHolder;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.ReferenceCountUtil;
import io.reactivex.netty.metrics.Clock;
import io.reactivex.netty.metrics.MetricEventsSubject;
import io.reactivex.netty.protocol.http.UnicastContentSubject;
import io.reactivex.netty.protocol.http.server.HttpServerMetricsEvent;
import io.reactivex.netty.protocol.http.server.HttpServerRequest;
import io.reactivex.netty.protocol.http.server.HttpServerResponse;
import io.reactivex.netty.server.ServerMetricsEvent;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

public class ServerRequestResponseConverter
extends ChannelDuplexHandler {
    public static final IOException CONN_CLOSE_BEFORE_REQUEST_COMPLETE = new IOException("Connection closed by peer before sending the entire request.");
    private final MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject;
    private final long requestContentSubscriptionTimeoutMs;
    private RequestState currentRequestState;

    public ServerRequestResponseConverter(MetricEventsSubject<ServerMetricsEvent<?>> eventsSubject, long requestContentSubscriptionTimeoutMs) {
        this.eventsSubject = eventsSubject;
        this.requestContentSubscriptionTimeoutMs = requestContentSubscriptionTimeoutMs;
        this.currentRequestState = new RequestState();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Class<?> recievedMsgClass = msg.getClass();
        RequestState stateToUse = this.currentRequestState;
        boolean isHttpRequest = false;
        if (HttpRequest.class.isAssignableFrom(recievedMsgClass)) {
            this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_HEADERS_RECEIVED);
            stateToUse.createRxRequest(ctx, (HttpRequest)msg);
            stateToUse.onProcessingStart(Clock.newStartTimeMillis());
            super.channelRead(ctx, stateToUse.rxRequest);
            isHttpRequest = true;
        }
        if (HttpContent.class.isAssignableFrom(recievedMsgClass)) {
            ByteBuf content = ((ByteBufHolder)msg).content();
            this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_CONTENT_RECEIVED);
            ServerRequestResponseConverter.invokeContentOnNext(content, stateToUse.contentSubject);
            if (LastHttpContent.class.isAssignableFrom(recievedMsgClass)) {
                stateToUse.onRequestComplete();
                this.currentRequestState = new RequestState();
            }
        } else if (!isHttpRequest) {
            ServerRequestResponseConverter.invokeContentOnNext(msg, stateToUse.contentSubject);
        }
    }

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        Class<?> recievedMsgClass = msg.getClass();
        long startTimeMillis = Clock.newStartTimeMillis();
        if (HttpServerResponse.class.isAssignableFrom(recievedMsgClass)) {
            HttpServerResponse rxResponse = (HttpServerResponse)msg;
            this.eventsSubject.onEvent(HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_START);
            this.addWriteCompleteEvents(promise, startTimeMillis, HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_SUCCESS, HttpServerMetricsEvent.RESPONSE_HEADERS_WRITE_FAILED);
            super.write(ctx, rxResponse.getNettyResponse(), promise);
        } else if (ByteBuf.class.isAssignableFrom(recievedMsgClass)) {
            this.eventsSubject.onEvent(HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_START);
            this.addWriteCompleteEvents(promise, startTimeMillis, HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_SUCCESS, HttpServerMetricsEvent.RESPONSE_CONTENT_WRITE_FAILED);
            DefaultHttpContent content = new DefaultHttpContent((ByteBuf)msg);
            super.write(ctx, content, promise);
        } else {
            super.write(ctx, msg, promise);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        super.channelReadComplete(ctx);
        ctx.pipeline().flush();
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        this.currentRequestState.onConnectionClose();
        super.channelInactive(ctx);
    }

    private void addWriteCompleteEvents(ChannelPromise promise, final long startTimeMillis, final HttpServerMetricsEvent<HttpServerMetricsEvent.EventType> successEvent, final HttpServerMetricsEvent<HttpServerMetricsEvent.EventType> failureEvent) {
        promise.addListener(new ChannelFutureListener(){

            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (future.isSuccess()) {
                    ServerRequestResponseConverter.this.eventsSubject.onEvent(successEvent, Clock.onEndMillis(startTimeMillis));
                } else {
                    ServerRequestResponseConverter.this.eventsSubject.onEvent(failureEvent, Clock.onEndMillis(startTimeMillis), future.cause());
                }
            }
        });
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void invokeContentOnNext(Object nextObject, UnicastContentSubject contentSubject) {
        try {
            contentSubject.onNext(nextObject);
        }
        catch (ClassCastException e) {
            contentSubject.onError(e);
        }
        finally {
            ReferenceCountUtil.release(nextObject);
        }
    }

    private final class RequestState {
        private HttpServerRequest rxRequest;
        private UnicastContentSubject contentSubject;
        private boolean isReadingRequest;

        private RequestState() {
        }

        private void createRxRequest(ChannelHandlerContext ctx, HttpRequest httpRequest) {
            this.contentSubject = UnicastContentSubject.create(ServerRequestResponseConverter.this.requestContentSubscriptionTimeoutMs, TimeUnit.MILLISECONDS);
            this.rxRequest = new HttpServerRequest(ctx.channel(), httpRequest, this.contentSubject);
        }

        private void onProcessingStart(long startTimeMillis) {
            this.rxRequest.onProcessingStart(startTimeMillis);
            this.isReadingRequest = true;
        }

        private void onRequestComplete() {
            this.isReadingRequest = false;
            long durationInMs = this.rxRequest.onProcessingEnd();
            ServerRequestResponseConverter.this.eventsSubject.onEvent(HttpServerMetricsEvent.REQUEST_RECEIVE_COMPLETE, durationInMs);
            this.contentSubject.onCompleted();
        }

        private void onConnectionClose() {
            if (this.isReadingRequest) {
                this.contentSubject.onError(CONN_CLOSE_BEFORE_REQUEST_COMPLETE);
            }
        }
    }
}

