/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.router.KeepAliveWrite;
import io.netty.handler.codec.http.router.Routed;
import java.nio.charset.Charset;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.NotFoundException;
import org.apache.flink.runtime.webmonitor.handlers.HandlerRedirectUtils;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.util.ExceptionUtils;
import scala.Option;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

@ChannelHandler.Sharable
public class RuntimeMonitorHandler
extends SimpleChannelInboundHandler<Routed> {
    private static final Charset ENCODING = Charset.forName("UTF-8");
    private final RequestHandler handler;
    private final JobManagerRetriever retriever;
    private final Future<String> localJobManagerAddressFuture;
    private final FiniteDuration timeout;
    private final String contentType;
    private String localJobManagerAddress;

    public RuntimeMonitorHandler(RequestHandler handler, JobManagerRetriever retriever, Future<String> localJobManagerAddressFuture, FiniteDuration timeout) {
        this.handler = Preconditions.checkNotNull(handler);
        this.retriever = Preconditions.checkNotNull(retriever);
        this.localJobManagerAddressFuture = Preconditions.checkNotNull(localJobManagerAddressFuture);
        this.timeout = Preconditions.checkNotNull(timeout);
        this.contentType = handler instanceof RequestHandler.JsonResponse ? "application/json" : "text/plain";
    }

    protected void channelRead0(ChannelHandlerContext ctx, Routed routed) throws Exception {
        if (this.localJobManagerAddressFuture.isCompleted()) {
            Option<Tuple2<ActorGateway, Integer>> jobManager;
            if (this.localJobManagerAddress == null) {
                this.localJobManagerAddress = (String)Await.result(this.localJobManagerAddressFuture, (Duration)this.timeout);
            }
            if ((jobManager = this.retriever.getJobManagerGatewayAndWebPort()).isDefined()) {
                Tuple2 gatewayPort = (Tuple2)jobManager.get();
                String redirectAddress = HandlerRedirectUtils.getRedirectAddress(this.localJobManagerAddress, (Tuple2<ActorGateway, Integer>)gatewayPort);
                if (redirectAddress != null) {
                    HttpResponse redirect = HandlerRedirectUtils.getRedirectResponse(redirectAddress, routed.path());
                    KeepAliveWrite.flush((ChannelHandlerContext)ctx, (HttpRequest)routed.request(), (HttpResponse)redirect);
                } else {
                    this.respondAsLeader(ctx, routed, (ActorGateway)gatewayPort._1());
                }
            } else {
                KeepAliveWrite.flush((ChannelHandlerContext)ctx, (HttpRequest)routed.request(), (HttpResponse)HandlerRedirectUtils.getUnavailableResponse());
            }
        } else {
            KeepAliveWrite.flush((ChannelHandlerContext)ctx, (HttpRequest)routed.request(), (HttpResponse)HandlerRedirectUtils.getUnavailableResponse());
        }
    }

    private void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) {
        DefaultFullHttpResponse response;
        try {
            String result = this.handler.handleRequest(routed.pathParams(), jobManager);
            byte[] bytes = result.getBytes(ENCODING);
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer((byte[])bytes));
            response.headers().set("Content-Type", (Object)this.contentType);
        }
        catch (NotFoundException e) {
            ByteBuf message = e.getMessage() == null ? Unpooled.buffer((int)0) : Unpooled.wrappedBuffer((byte[])e.getMessage().getBytes(ENCODING));
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message);
            response.headers().set("Content-Type", (Object)"text/plain");
        }
        catch (Exception e) {
            byte[] bytes = ExceptionUtils.stringifyException((Throwable)e).getBytes(ENCODING);
            response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer((byte[])bytes));
            response.headers().set("Content-Type", (Object)"text/plain");
        }
        response.headers().set("Content-Encoding", (Object)"utf-8");
        response.headers().set("Content-Length", (Object)response.content().readableBytes());
        KeepAliveWrite.flush((ChannelHandlerContext)ctx, (HttpRequest)routed.request(), (HttpResponse)response);
    }
}

