package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.router.Handler;
import io.netty.handler.codec.http.router.Router;
import io.netty.handler.stream.ChunkedWriteHandler;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.FileUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.class */
public class WebRuntimeMonitor implements WebMonitor {
    public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
    private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
    private final LeaderRetrievalService leaderRetrievalService;
    private final JobManagerRetriever retriever;
    private final Router router;
    private final int configuredPort;
    private final ServerBootstrap bootstrap;
    private final FiniteDuration timeout;
    private Channel serverChannel;
    private final File webRootDir;
    private final Object startupShutdownLock = new Object();
    private final Promise<String> jobManagerAddressPromise = new Promise.DefaultPromise();
    private AtomicBoolean isShutdown = new AtomicBoolean();

    public WebRuntimeMonitor(Configuration configuration, LeaderRetrievalService leaderRetrievalService, ActorSystem actorSystem) throws IOException, InterruptedException {
        this.leaderRetrievalService = (LeaderRetrievalService) Preconditions.checkNotNull(leaderRetrievalService);
        WebMonitorConfig webMonitorConfig = new WebMonitorConfig(configuration);
        this.webRootDir = new File(System.getProperty("java.io.tmpdir"), String.format("flink-web-%s", UUID.randomUUID().toString()));
        LOG.info("Using directory {} for the web interface files", this.webRootDir);
        WebMonitorUtils.LogFiles find = WebMonitorUtils.LogFiles.find(configuration);
        LOG.info("Serving job manager log from {}", find.logFile.getAbsolutePath());
        LOG.info("Serving job manager stdout from {}", find.stdOutFile.getAbsolutePath());
        this.configuredPort = webMonitorConfig.getWebFrontendPort();
        if (this.configuredPort < 0) {
            throw new IllegalArgumentException("Web frontend port is invalid: " + this.configuredPort);
        }
        this.timeout = AkkaUtils.getTimeout(configuration);
        this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(configuration), this.timeout);
        ExecutionGraphHolder executionGraphHolder = new ExecutionGraphHolder();
        this.router = new Router().GET("/config", handler(new DashboardConfigHandler(webMonitorConfig.getRefreshInterval()))).GET("/overview", handler(new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT))).GET("/jobmanager/config", handler(new JobManagerConfigHandler(configuration))).GET("/joboverview", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true))).GET("/joboverview/running", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false))).GET("/joboverview/completed", handler(new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true))).GET("/jobs", handler(new CurrentJobIdsHandler(this.retriever, DEFAULT_REQUEST_TIMEOUT))).GET("/jobs/:jobid", handler(new JobDetailsHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices", handler(new JobDetailsHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices/:vertexid", handler(new JobVertexDetailsHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices/:vertexid/subtasktimes", handler(new SubtasksTimesHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices/:vertexid/accumulators", handler(new JobVertexAccumulatorsHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", handler(new SubtasksAllAccumulatorsHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", handler(new SubtaskCurrentAttemptDetailsHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", handler(new SubtaskExecutionAttemptDetailsHandler(executionGraphHolder))).GET("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", handler(new SubtaskExecutionAttemptAccumulatorsHandler(executionGraphHolder))).GET("/jobs/:jobid/plan", handler(new JobPlanHandler(executionGraphHolder))).GET("/jobs/:jobid/config", handler(new JobConfigHandler(executionGraphHolder))).GET("/jobs/:jobid/exceptions", handler(new JobExceptionsHandler(executionGraphHolder))).GET("/jobs/:jobid/accumulators", handler(new JobAccumulatorsHandler(executionGraphHolder))).GET("/taskmanagers", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))).GET("/taskmanagers/:taskmanagerid", handler(new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT))).GET("/jobmanager/log", new StaticFileServerHandler(this.retriever, this.jobManagerAddressPromise.future(), this.timeout, find.logFile)).GET("/jobmanager/stdout", new StaticFileServerHandler(this.retriever, this.jobManagerAddressPromise.future(), this.timeout, find.stdOutFile)).GET("/jobs/:jobid/yarn-cancel", handler(new JobCancellationHandler())).DELETE("/jobs/:jobid", handler(new JobCancellationHandler())).GET("/:*", new StaticFileServerHandler(this.retriever, this.jobManagerAddressPromise.future(), this.timeout, this.webRootDir));
        synchronized (this.startupShutdownLock) {
            try {
                Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.flink.runtime.webmonitor.WebRuntimeMonitor.1
                    @Override // java.lang.Thread, java.lang.Runnable
                    public void run() {
                        WebRuntimeMonitor.this.shutdown();
                    }
                });
            } catch (IllegalStateException e) {
                LOG.debug("Unable to add shutdown hook, shutdown already in progress", e);
            } catch (Throwable th) {
                LOG.warn("Error while adding shutdown hook", th);
            }
            ChannelInitializer<SocketChannel> channelInitializer = new ChannelInitializer<SocketChannel>() { // from class: org.apache.flink.runtime.webmonitor.WebRuntimeMonitor.2
                /* JADX INFO: Access modifiers changed from: protected */
                public void initChannel(SocketChannel socketChannel) {
                    Handler handler = new Handler(WebRuntimeMonitor.this.router);
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new HttpServerCodec()}).addLast(new ChannelHandler[]{new HttpObjectAggregator(65536)}).addLast(new ChannelHandler[]{new ChunkedWriteHandler()}).addLast(handler.name(), handler);
                }
            };
            NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(1);
            NioEventLoopGroup nioEventLoopGroup2 = new NioEventLoopGroup();
            this.bootstrap = new ServerBootstrap();
            this.bootstrap.group(nioEventLoopGroup, nioEventLoopGroup2).channel(NioServerSocketChannel.class).childHandler(channelInitializer);
            Channel channel = this.bootstrap.bind(this.configuredPort).sync().channel();
            this.serverChannel = channel;
            InetSocketAddress inetSocketAddress = (InetSocketAddress) channel.localAddress();
            LOG.info("Web frontend listening at " + inetSocketAddress.getAddress().getHostAddress() + ':' + inetSocketAddress.getPort());
        }
    }

    public void start(String str) throws Exception {
        LOG.info("Starting with JobManager {} on port {}", str, Integer.valueOf(getServerPort()));
        synchronized (this.startupShutdownLock) {
            this.jobManagerAddressPromise.success(str);
            this.leaderRetrievalService.start(this.retriever);
        }
    }

    public void stop() throws Exception {
        synchronized (this.startupShutdownLock) {
            this.leaderRetrievalService.stop();
            if (this.serverChannel != null) {
                this.serverChannel.close().awaitUninterruptibly();
                this.serverChannel = null;
            }
            if (this.bootstrap != null && this.bootstrap.group() != null) {
                this.bootstrap.group().shutdownGracefully();
            }
            shutdown();
        }
    }

    public int getServerPort() {
        Channel channel = this.serverChannel;
        if (channel == null) {
            return -1;
        }
        try {
            return ((InetSocketAddress) channel.localAddress()).getPort();
        } catch (Exception e) {
            LOG.error("Cannot access local server port", e);
            return -1;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void shutdown() {
        if (this.isShutdown.compareAndSet(false, true)) {
            try {
                LOG.info("Removing web root dir {}", this.webRootDir);
                FileUtils.deleteDirectory(this.webRootDir);
            } catch (Throwable th) {
                LOG.warn("Error while deleting web root dir {}", this.webRootDir, th);
            }
        }
    }

    private RuntimeMonitorHandler handler(RequestHandler requestHandler) {
        return new RuntimeMonitorHandler(requestHandler, this.retriever, this.jobManagerAddressPromise.future(), this.timeout);
    }
}
