/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitor;
import org.apache.flink.runtime.webmonitor.WebMonitorConfig;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class WebRuntimeMonitor
implements WebMonitor {
    public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10L, TimeUnit.SECONDS);
    private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
    private final Object startupShutdownLock = new Object();
    private final LeaderRetrievalService leaderRetrievalService;
    private final JobManagerRetriever retriever;
    private final Router router;
    private final int configuredPort;
    private final ServerBootstrap bootstrap;
    private final Promise<String> jobManagerAddressPromise = new Promise.DefaultPromise();
    private final FiniteDuration timeout;
    private Channel serverChannel;
    private final File webRootDir;
    private AtomicBoolean isShutdown = new AtomicBoolean();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public WebRuntimeMonitor(Configuration config, LeaderRetrievalService leaderRetrievalService, ActorSystem actorSystem) throws IOException, InterruptedException {
        this.leaderRetrievalService = Preconditions.checkNotNull(leaderRetrievalService);
        WebMonitorConfig cfg = new WebMonitorConfig(config);
        String fileName = String.format("flink-web-%s", UUID.randomUUID().toString());
        this.webRootDir = new File(System.getProperty("java.io.tmpdir"), fileName);
        LOG.info("Using directory {} for the web interface files", (Object)this.webRootDir);
        WebMonitorUtils.LogFiles logFiles = WebMonitorUtils.LogFiles.find((Configuration)config);
        LOG.info("Serving job manager log from {}", (Object)logFiles.logFile.getAbsolutePath());
        LOG.info("Serving job manager stdout from {}", (Object)logFiles.stdOutFile.getAbsolutePath());
        this.configuredPort = cfg.getWebFrontendPort();
        if (this.configuredPort < 0) {
            throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
        }
        this.timeout = AkkaUtils.getTimeout((Configuration)config);
        FiniteDuration lookupTimeout = AkkaUtils.getTimeout((Configuration)config);
        this.retriever = new JobManagerRetriever(this, actorSystem, lookupTimeout, this.timeout);
        ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder();
        this.router = (Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)((Router)new Router().GET("/config", (Object)this.handler(new DashboardConfigHandler(cfg.getRefreshInterval())))).GET("/overview", (Object)this.handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT)))).GET("/jobmanager/config", (Object)this.handler(new JobManagerConfigHandler(config)))).GET("/joboverview", (Object)this.handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true)))).GET("/joboverview/running", (Object)this.handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false)))).GET("/joboverview/completed", (Object)this.handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true)))).GET("/jobs", (Object)this.handler(new CurrentJobIdsHandler(this.retriever, DEFAULT_REQUEST_TIMEOUT)))).GET("/jobs/:jobid", (Object)this.handler(new JobDetailsHandler(currentGraphs)))).GET("/jobs/:jobid/vertices", (Object)this.handler(new JobDetailsHandler(currentGraphs)))).GET("/jobs/:jobid/vertices/:vertexid", (Object)this.handler(new JobVertexDetailsHandler(currentGraphs)))).GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", (Object)this.handler(new SubtasksTimesHandler(currentGraphs)))).GET("/jobs/:jobid/vertices/:vertexid/accumulators", (Object)this.handler(new JobVertexAccumulatorsHandler(currentGraphs)))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", (Object)this.handler(new SubtasksAllAccumulatorsHandler(currentGraphs)))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", (Object)this.handler(new SubtaskCurrentAttemptDetailsHandler(currentGraphs)))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", (Object)this.handler(new SubtaskExecutionAttemptDetailsHandler(currentGraphs)))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", (Object)this.handler(new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs)))).GET("/jobs/:jobid/plan", (Object)this.handler(new JobPlanHandler(currentGraphs)))).GET("/jobs/:jobid/config", (Object)this.handler(new JobConfigHandler(currentGraphs)))).GET("/jobs/:jobid/exceptions", (Object)this.handler(new JobExceptionsHandler(currentGraphs)))).GET("/jobs/:jobid/accumulators", (Object)this.handler(new JobAccumulatorsHandler(currentGraphs)))).GET("/taskmanagers", (Object)this.handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))).GET("/taskmanagers/:taskmanagerid", (Object)this.handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT)))).GET("/jobmanager/log", (Object)new StaticFileServerHandler(this.retriever, (Future<String>)this.jobManagerAddressPromise.future(), this.timeout, logFiles.logFile))).GET("/jobmanager/stdout", (Object)new StaticFileServerHandler(this.retriever, (Future<String>)this.jobManagerAddressPromise.future(), this.timeout, logFiles.stdOutFile))).GET("/:*", (Object)new StaticFileServerHandler(this.retriever, (Future<String>)this.jobManagerAddressPromise.future(), this.timeout, this.webRootDir))).DELETE("/jobs/:jobid", (Object)this.handler(new JobCancellationHandler()));
        Object object = this.startupShutdownLock;
        synchronized (object) {
            Channel ch;
            try {
                Runtime.getRuntime().addShutdownHook(new Thread(){

                    @Override
                    public void run() {
                        WebRuntimeMonitor.this.shutdown();
                    }
                });
            }
            catch (IllegalStateException e) {
                LOG.debug("Unable to add shutdown hook, shutdown already in progress", (Throwable)e);
            }
            catch (Throwable t) {
                LOG.warn("Error while adding shutdown hook", t);
            }
            ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>(){

                protected void initChannel(SocketChannel ch) {
                    Handler handler = new Handler(WebRuntimeMonitor.this.router);
                    ch.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(65536)}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(handler.name(), (ChannelHandler)handler);
                }
            };
            NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup workerGroup = new NioEventLoopGroup();
            this.bootstrap = new ServerBootstrap();
            ((ServerBootstrap)this.bootstrap.group((EventLoopGroup)bossGroup, (EventLoopGroup)workerGroup).channel(NioServerSocketChannel.class)).childHandler((ChannelHandler)initializer);
            this.serverChannel = ch = this.bootstrap.bind(this.configuredPort).sync().channel();
            InetSocketAddress bindAddress = (InetSocketAddress)ch.localAddress();
            String address = bindAddress.getAddress().getHostAddress();
            int port = bindAddress.getPort();
            LOG.info("Web frontend listening at " + address + ':' + port);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void start(String jobManagerAkkaUrl) throws Exception {
        LOG.info("Starting with JobManager {} on port {}", (Object)jobManagerAkkaUrl, (Object)this.getServerPort());
        Object object = this.startupShutdownLock;
        synchronized (object) {
            this.jobManagerAddressPromise.success((Object)jobManagerAkkaUrl);
            this.leaderRetrievalService.start((LeaderRetrievalListener)this.retriever);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop() throws Exception {
        Object object = this.startupShutdownLock;
        synchronized (object) {
            this.leaderRetrievalService.stop();
            if (this.serverChannel != null) {
                this.serverChannel.close().awaitUninterruptibly();
                this.serverChannel = null;
            }
            if (this.bootstrap != null && this.bootstrap.group() != null) {
                this.bootstrap.group().shutdownGracefully();
            }
            this.shutdown();
        }
    }

    public int getServerPort() {
        Channel server = this.serverChannel;
        if (server != null) {
            try {
                return ((InetSocketAddress)server.localAddress()).getPort();
            }
            catch (Exception e) {
                LOG.error("Cannot access local server port", (Throwable)e);
            }
        }
        return -1;
    }

    private void shutdown() {
        if (!this.isShutdown.compareAndSet(false, true)) {
            return;
        }
        try {
            LOG.info("Removing web root dir {}", (Object)this.webRootDir);
            FileUtils.deleteDirectory((File)this.webRootDir);
        }
        catch (Throwable t) {
            LOG.warn("Error while deleting web root dir {}", (Object)this.webRootDir, (Object)t);
        }
    }

    private RuntimeMonitorHandler handler(RequestHandler handler) {
        return new RuntimeMonitorHandler(handler, this.retriever, (Future<String>)this.jobManagerAddressPromise.future(), this.timeout);
    }
}

