package co.cask.cdap.gateway.router.handlers;

import co.cask.cdap.common.HandlerException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.discovery.EndpointStrategy;
import co.cask.cdap.common.http.Channels;
import co.cask.cdap.gateway.router.RouterServiceLookup;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPromise;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpClientCodec;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.util.ReferenceCountUtil;
import java.io.Closeable;
import java.io.Flushable;
import java.nio.channels.ClosedChannelException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import javax.annotation.Nullable;
import javax.net.ssl.SSLException;
import org.apache.twill.discovery.Discoverable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/gateway/router/handlers/HttpRequestRouter.class */
public class HttpRequestRouter extends ChannelDuplexHandler {
    private static final Logger LOG = LoggerFactory.getLogger(HttpRequestRouter.class);
    private static final byte[] HTTPS_SCHEME_BYTES = "https://".getBytes();
    private final CConfiguration cConf;
    private final RouterServiceLookup serviceLookup;
    private final Map<Discoverable, Queue<MessageSender>> messageSenders = new HashMap();
    private int inflightRequests;
    private MessageSender currentMessageSender;
    private ChannelFutureListener failureResponseListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/gateway/router/handlers/HttpRequestRouter$MessageSender.class */
    public static final class MessageSender implements Flushable, Closeable {
        private final Discoverable discoverable;
        private final Queue<OutboundMessage> pendingMessages;
        private final Bootstrap clientBootstrap;
        private volatile SslContext sslContext;
        private Channel outboundChannel;
        private boolean closed;
        private boolean connecting;

        private MessageSender(final CConfiguration cConfiguration, final Channel channel, final Discoverable discoverable) {
            this.discoverable = discoverable;
            this.pendingMessages = new LinkedList();
            final ChannelFutureListener channelFutureListener = new ChannelFutureListener() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestRouter.MessageSender.1
                public void operationComplete(ChannelFuture channelFuture) {
                    MessageSender.this.outboundChannel = null;
                    MessageSender.this.connecting = false;
                }
            };
            this.clientBootstrap = new Bootstrap().group(channel.eventLoop()).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(new ChannelInitializer<SocketChannel>() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestRouter.MessageSender.2
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.closeFuture().addListener(channelFutureListener);
                    ChannelPipeline pipeline = socketChannel.pipeline();
                    SslHandler sslHandler = MessageSender.this.getSslHandler(discoverable, socketChannel.alloc());
                    if (sslHandler != null) {
                        pipeline.addLast("ssl", sslHandler);
                    }
                    pipeline.addLast("idle-state-handler", new IdleStateHandler(0, 0, cConfiguration.getInt("router.connection.idle.timeout.secs")));
                    pipeline.addLast("codec", new HttpClientCodec());
                    pipeline.addLast("forwarder", new OutboundHandler(channel));
                }
            });
        }

        void send(Object obj, ChannelFutureListener channelFutureListener) {
            if (this.outboundChannel != null) {
                this.outboundChannel.write(obj).addListener(channelFutureListener);
                return;
            }
            this.pendingMessages.add(new OutboundMessage(obj, channelFutureListener));
            if (this.connecting) {
                return;
            }
            this.clientBootstrap.connect(this.discoverable.getSocketAddress()).addListener(new ChannelFutureListener() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestRouter.MessageSender.3
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    MessageSender.this.outboundChannel = channelFuture.channel();
                    MessageSender.this.connecting = false;
                    if (channelFuture.isSuccess() && MessageSender.this.closed) {
                        Channels.closeOnFlush(MessageSender.this.outboundChannel);
                    }
                    Object poll = MessageSender.this.pendingMessages.poll();
                    while (true) {
                        OutboundMessage outboundMessage = (OutboundMessage) poll;
                        if (outboundMessage == null) {
                            break;
                        }
                        MessageSender.this.processMessage(outboundMessage, channelFuture);
                        poll = MessageSender.this.pendingMessages.poll();
                    }
                    if (channelFuture.isSuccess()) {
                        MessageSender.this.flush();
                    }
                }
            });
            this.connecting = true;
        }

        @Override // java.io.Flushable
        public void flush() {
            if (this.outboundChannel == null || this.closed) {
                return;
            }
            this.outboundChannel.flush();
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            if (this.outboundChannel != null) {
                Channels.closeOnFlush(this.outboundChannel);
            }
        }

        Discoverable getDiscoverable() {
            return this.discoverable;
        }

        /* JADX INFO: Access modifiers changed from: private */
        @Nullable
        public SslHandler getSslHandler(Discoverable discoverable, ByteBufAllocator byteBufAllocator) throws SSLException {
            SslHandler newHandler;
            if (!Arrays.equals(HttpRequestRouter.HTTPS_SCHEME_BYTES, discoverable.getPayload())) {
                return null;
            }
            SslContext sslContext = this.sslContext;
            if (sslContext != null) {
                return sslContext.newHandler(byteBufAllocator);
            }
            synchronized (this) {
                SslContext sslContext2 = this.sslContext;
                if (sslContext2 == null) {
                    SslContext build = SslContextBuilder.forClient().trustManager(InsecureTrustManagerFactory.INSTANCE).build();
                    sslContext2 = build;
                    this.sslContext = build;
                }
                newHandler = sslContext2.newHandler(byteBufAllocator);
            }
            return newHandler;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void processMessage(OutboundMessage outboundMessage, ChannelFuture channelFuture) throws Exception {
            Channel channel = channelFuture.channel();
            if (this.closed) {
                outboundMessage.writeCompletedListener.operationComplete(channel.newFailedFuture(new ClosedChannelException()));
            } else if (channelFuture.isSuccess()) {
                outboundMessage.write(channelFuture.channel());
            } else {
                outboundMessage.writeCompletedListener.operationComplete(channelFuture);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/gateway/router/handlers/HttpRequestRouter$OutboundMessage.class */
    public static final class OutboundMessage {
        private final Object message;
        private final ChannelFutureListener writeCompletedListener;

        OutboundMessage(Object obj, ChannelFutureListener channelFutureListener) {
            this.message = obj;
            this.writeCompletedListener = channelFutureListener;
        }

        void write(Channel channel) {
            channel.write(this.message).addListener(this.writeCompletedListener);
        }
    }

    public HttpRequestRouter(CConfiguration cConfiguration, RouterServiceLookup routerServiceLookup) {
        this.cConf = cConfiguration;
        this.serviceLookup = routerServiceLookup;
    }

    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        try {
            final Channel channel = channelHandlerContext.channel();
            ChannelFutureListener failureResponseListener = getFailureResponseListener(channel);
            if (obj instanceof HttpRequest) {
                this.inflightRequests++;
                if (this.inflightRequests != 1) {
                    return;
                }
                channel.config().setAutoRead(false);
                failureResponseListener = new ChannelFutureListener() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestRouter.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            channel.config().setAutoRead(true);
                        } else {
                            HttpRequestRouter.this.getFailureResponseListener(channel).operationComplete(channelFuture);
                        }
                    }
                };
                this.currentMessageSender = getMessageSender(channel, getDiscoverable((HttpRequest) obj));
            }
            if (this.inflightRequests == 1 && this.currentMessageSender != null) {
                ReferenceCountUtil.retain(obj);
                this.currentMessageSender.send(obj, failureResponseListener);
            }
            ReferenceCountUtil.release(obj);
        } finally {
            ReferenceCountUtil.release(obj);
        }
    }

    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) {
        if (this.currentMessageSender != null) {
            this.currentMessageSender.flush();
        }
        channelHandlerContext.fireChannelReadComplete();
    }

    public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) {
        channelHandlerContext.writeAndFlush(obj, channelPromise);
        if (obj instanceof LastHttpContent) {
            for (int i = 0; i < this.inflightRequests - 1; i++) {
                channelHandlerContext.writeAndFlush(createPipeliningNotSupported());
            }
            this.inflightRequests = 0;
            if (this.currentMessageSender != null) {
                this.messageSenders.get(this.currentMessageSender.getDiscoverable()).add(this.currentMessageSender);
            }
        }
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
        HttpResponse createFailureResponse = th instanceof HandlerException ? ((HandlerException) th).createFailureResponse() : createErrorResponse(th);
        HttpUtil.setKeepAlive(createFailureResponse, false);
        channelHandlerContext.writeAndFlush(createFailureResponse).addListener(ChannelFutureListener.CLOSE);
    }

    public void channelInactive(ChannelHandlerContext channelHandlerContext) {
        if (this.currentMessageSender != null) {
            this.currentMessageSender.close();
        }
        Iterator<Map.Entry<Discoverable, Queue<MessageSender>>> it = this.messageSenders.entrySet().iterator();
        while (it.hasNext()) {
            Iterator<MessageSender> it2 = it.next().getValue().iterator();
            while (it2.hasNext()) {
                it2.next().close();
            }
        }
        channelHandlerContext.fireChannelInactive();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ChannelFutureListener getFailureResponseListener(final Channel channel) {
        if (this.failureResponseListener == null) {
            this.failureResponseListener = new ChannelFutureListener() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestRouter.2
                public void operationComplete(ChannelFuture channelFuture) {
                    if (channelFuture.isSuccess()) {
                        return;
                    }
                    HttpResponse createErrorResponse = HttpRequestRouter.createErrorResponse(channelFuture.cause());
                    HttpUtil.setKeepAlive(createErrorResponse, false);
                    channel.writeAndFlush(createErrorResponse).addListener(ChannelFutureListener.CLOSE);
                }
            };
        }
        return this.failureResponseListener;
    }

    private Discoverable getDiscoverable(HttpRequest httpRequest) {
        EndpointStrategy discoverable = this.serviceLookup.getDiscoverable(httpRequest);
        if (discoverable == null) {
            throw new HandlerException(HttpResponseStatus.SERVICE_UNAVAILABLE, "No endpoint strategy found for request " + getRequestLine(httpRequest));
        }
        Discoverable pick = discoverable.pick();
        if (pick == null) {
            throw new HandlerException(HttpResponseStatus.SERVICE_UNAVAILABLE, "No discoverable found for request " + getRequestLine(httpRequest));
        }
        return pick;
    }

    private MessageSender getMessageSender(Channel channel, Discoverable discoverable) {
        MessageSender poll = this.messageSenders.computeIfAbsent(discoverable, discoverable2 -> {
            return new LinkedList();
        }).poll();
        if (poll != null) {
            LOG.trace("Reuse message sender for {}", discoverable);
            return poll;
        }
        MessageSender messageSender = new MessageSender(this.cConf, channel, discoverable);
        LOG.trace("Create new message sender for {}", discoverable);
        return messageSender;
    }

    private String getRequestLine(HttpRequest httpRequest) {
        return httpRequest.method() + " " + httpRequest.uri() + " " + httpRequest.protocolVersion();
    }

    private HttpResponse createPipeliningNotSupported() {
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_IMPLEMENTED);
        defaultFullHttpResponse.content().writeCharSequence("HTTP pipelining is not supported", StandardCharsets.UTF_8);
        HttpUtil.setContentLength(defaultFullHttpResponse, defaultFullHttpResponse.content().readableBytes());
        return defaultFullHttpResponse;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static HttpResponse createErrorResponse(Throwable th) {
        DefaultFullHttpResponse defaultFullHttpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR);
        if (th.getMessage() != null) {
            defaultFullHttpResponse.content().writeCharSequence(th.getMessage(), StandardCharsets.UTF_8);
        }
        HttpUtil.setContentLength(defaultFullHttpResponse, defaultFullHttpResponse.content().readableBytes());
        return defaultFullHttpResponse;
    }
}
