package co.cask.cdap.gateway.router.handlers;

import co.cask.cdap.common.HandlerException;
import co.cask.cdap.common.discovery.EndpointStrategy;
import co.cask.cdap.gateway.router.ProxyRule;
import co.cask.cdap.gateway.router.RouterServiceLookup;
import com.google.common.collect.Queues;
import com.google.common.io.Closeables;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.twill.discovery.Discoverable;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/gateway/router/handlers/HttpRequestHandler.class */
public class HttpRequestHandler extends SimpleChannelUpstreamHandler {
    private static final Logger LOG = LoggerFactory.getLogger(HttpRequestHandler.class);
    private final ClientBootstrap clientBootstrap;
    private final RouterServiceLookup serviceLookup;
    private final List<ProxyRule> proxyRules;
    private MessageSender chunkSender;
    private volatile boolean channelClosed;
    private final AtomicInteger exceptionsHandled = new AtomicInteger(0);
    private final Map<WrappedDiscoverable, MessageSender> discoveryLookup = new HashMap();

    /* loaded from: input_file:co/cask/cdap/gateway/router/handlers/HttpRequestHandler$MessageSender.class */
    private static final class MessageSender implements Closeable {
        private final Channel inBoundChannel;
        private final ChannelFuture channelFuture;
        private final Queue<OutboundMessage> messages;
        private final AtomicBoolean writer;

        private MessageSender(Channel channel, ChannelFuture channelFuture) {
            this.inBoundChannel = channel;
            this.channelFuture = channelFuture;
            this.messages = Queues.newConcurrentLinkedQueue();
            this.writer = new AtomicBoolean(false);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isConnected() {
            return this.channelFuture.getChannel().isConnected();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void send(Object obj) {
            this.inBoundChannel.setAttachment(this.channelFuture.getChannel());
            final OutboundMessage outboundMessage = new OutboundMessage(obj);
            this.messages.add(outboundMessage);
            if (this.channelFuture.isSuccess()) {
                flushUntilCompleted(this.channelFuture.getChannel(), outboundMessage);
            } else {
                this.channelFuture.addListener(new ChannelFutureListener() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestHandler.MessageSender.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            MessageSender.this.flushUntilCompleted(channelFuture.getChannel(), outboundMessage);
                        } else {
                            HttpRequestHandler.closeOnFlush(MessageSender.this.inBoundChannel);
                        }
                    }
                });
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void flushUntilCompleted(Channel channel, OutboundMessage outboundMessage) {
            while (!outboundMessage.isCompleted()) {
                if (this.writer.compareAndSet(false, true)) {
                    try {
                        OutboundMessage poll = this.messages.poll();
                        while (poll != null) {
                            poll.write(channel);
                            poll.completed();
                            poll = this.messages.poll();
                        }
                    } finally {
                        this.writer.set(false);
                    }
                } else {
                    Thread.yield();
                }
            }
        }

        @Override // java.io.Closeable, java.lang.AutoCloseable
        public void close() throws IOException {
            HttpRequestHandler.closeOnFlush(this.channelFuture.getChannel());
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/gateway/router/handlers/HttpRequestHandler$OutboundMessage.class */
    public static final class OutboundMessage {
        private final Object message;
        private boolean completed;

        private OutboundMessage(Object obj) {
            this.message = obj;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isCompleted() {
            return this.completed;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void completed() {
            this.completed = true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void write(Channel channel) {
            channel.write(this.message);
        }
    }

    public HttpRequestHandler(ClientBootstrap clientBootstrap, RouterServiceLookup routerServiceLookup, List<ProxyRule> list) {
        this.clientBootstrap = clientBootstrap;
        this.serviceLookup = routerServiceLookup;
        this.proxyRules = list;
    }

    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        if (this.channelClosed) {
            return;
        }
        final Channel channel = messageEvent.getChannel();
        Object message = messageEvent.getMessage();
        if (message instanceof HttpChunk) {
            if (this.chunkSender == null) {
                throw new HandlerException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "Chunk received and event sender is null");
            }
            this.chunkSender.send(message);
            return;
        }
        if (!(message instanceof HttpRequest)) {
            super.messageReceived(channelHandlerContext, messageEvent);
            return;
        }
        HttpRequest applyProxyRules = applyProxyRules((HttpRequest) message);
        channel.setReadable(false);
        WrappedDiscoverable discoverable = getDiscoverable(applyProxyRules, (InetSocketAddress) channel.getLocalAddress());
        MessageSender messageSender = this.discoveryLookup.get(discoverable);
        if (messageSender == null || !messageSender.isConnected()) {
            ChannelFuture connect = this.clientBootstrap.connect(discoverable.getSocketAddress());
            final Channel channel2 = connect.getChannel();
            channel2.getPipeline().addAfter("request-encoder", "outbound-handler", new OutboundHandler(channel));
            messageSender = new MessageSender(channel, connect);
            this.discoveryLookup.put(discoverable, messageSender);
            channel.setAttachment(channel2);
            channel2.getCloseFuture().addListener(new ChannelFutureListener() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestHandler.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    channel.getPipeline().execute(new Runnable() { // from class: co.cask.cdap.gateway.router.handlers.HttpRequestHandler.1.1
                        @Override // java.lang.Runnable
                        public void run() {
                            if (channel2.equals(channel.getAttachment())) {
                                HttpRequestHandler.closeOnFlush(channel);
                            }
                        }
                    });
                }
            });
        }
        messageSender.send(applyProxyRules);
        channel.setReadable(true);
        if (applyProxyRules.isChunked()) {
            this.chunkSender = messageSender;
        }
    }

    private HttpRequest applyProxyRules(HttpRequest httpRequest) {
        Iterator<ProxyRule> it = this.proxyRules.iterator();
        while (it.hasNext()) {
            httpRequest = it.next().apply(httpRequest);
        }
        return httpRequest;
    }

    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        HandlerException cause = exceptionEvent.getCause();
        switch (this.exceptionsHandled.incrementAndGet()) {
            case 1:
                LOG.error("Exception raised in Request Handler {}", channelHandlerContext.getChannel(), cause);
                if (!channelHandlerContext.getChannel().isConnected() || this.channelClosed) {
                    return;
                }
                Channels.write(channelHandlerContext, exceptionEvent.getFuture(), cause instanceof HandlerException ? cause.createFailureResponse() : new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR));
                exceptionEvent.getFuture().addListener(ChannelFutureListener.CLOSE);
                return;
            case 2:
                LOG.error("Not handling exception due to already having handled an exception in Request Handler {}", channelHandlerContext.getChannel(), cause);
                return;
            default:
                return;
        }
    }

    public void channelClosed(ChannelHandlerContext channelHandlerContext, ChannelStateEvent channelStateEvent) throws Exception {
        LOG.trace("Channel closed {}", channelHandlerContext.getChannel());
        Iterator<MessageSender> it = this.discoveryLookup.values().iterator();
        while (it.hasNext()) {
            Closeables.closeQuietly(it.next());
        }
        this.channelClosed = true;
        super.channelClosed(channelHandlerContext, channelStateEvent);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void closeOnFlush(Channel channel) {
        if (!channel.isConnected() || channel.getCloseFuture().isDone()) {
            return;
        }
        channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    private WrappedDiscoverable getDiscoverable(HttpRequest httpRequest, InetSocketAddress inetSocketAddress) {
        EndpointStrategy discoverable = this.serviceLookup.getDiscoverable(inetSocketAddress.getPort(), httpRequest);
        if (discoverable == null) {
            throw new HandlerException(HttpResponseStatus.SERVICE_UNAVAILABLE, String.format("No endpoint strategy found for request : %s", httpRequest.getUri()));
        }
        Discoverable pick = discoverable.pick();
        if (pick == null) {
            throw new HandlerException(HttpResponseStatus.SERVICE_UNAVAILABLE, String.format("No discoverable found for request : %s", httpRequest.getUri()));
        }
        return new WrappedDiscoverable(pick);
    }
}
