package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import io.netty.handler.codec.http.router.Router;
import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLContext;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
import org.apache.flink.runtime.webmonitor.handlers.ClusterOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.ConstantTextHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobIdsHandler;
import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobCancellationWithSavepointHandlers;
import org.apache.flink.runtime.webmonitor.handlers.JobConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobExceptionsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobManagerConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobStoppingHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexBackPressureHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.JobVertexTaskManagersHandler;
import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.webmonitor.handlers.SubtasksTimesHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagerLogHandler;
import org.apache.flink.runtime.webmonitor.handlers.TaskManagersHandler;
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
import org.apache.flink.runtime.webmonitor.metrics.TaskManagerMetricsHandler;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.ExecutionContextExecutor;
import scala.concurrent.Promise;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.class */
public class WebRuntimeMonitor implements WebMonitor {
    public static final FiniteDuration DEFAULT_REQUEST_TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
    private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
    private final LeaderRetrievalService leaderRetrievalService;
    private final JobManagerRetriever retriever;
    private final SSLContext serverSSLContext;
    private final FiniteDuration timeout;
    private final WebFrontendBootstrap netty;
    private final File webRootDir;
    private final File uploadDir;
    private final StackTraceSampleCoordinator stackTraceSamples;
    private final BackPressureStatsTracker backPressureStatsTracker;
    private final WebMonitorConfig cfg;
    private ExecutorService executorService;
    private MetricFetcher metricFetcher;
    private final Object startupShutdownLock = new Object();
    private final Promise<String> jobManagerAddressPromise = new Promise.DefaultPromise();
    private AtomicBoolean cleanedUp = new AtomicBoolean();

    public WebRuntimeMonitor(Configuration configuration, LeaderRetrievalService leaderRetrievalService, BlobView blobView, ActorSystem actorSystem) throws IOException, InterruptedException {
        this.leaderRetrievalService = (LeaderRetrievalService) Preconditions.checkNotNull(leaderRetrievalService);
        this.timeout = AkkaUtils.getTimeout(configuration);
        this.retriever = new JobManagerRetriever(this, actorSystem, AkkaUtils.getTimeout(configuration), this.timeout);
        this.cfg = new WebMonitorConfig(configuration);
        String webFrontendAddress = this.cfg.getWebFrontendAddress();
        int webFrontendPort = this.cfg.getWebFrontendPort();
        if (webFrontendPort < 0) {
            throw new IllegalArgumentException("Web frontend port is invalid: " + webFrontendPort);
        }
        WebMonitorUtils.LogFileLocation find = WebMonitorUtils.LogFileLocation.find(configuration);
        this.webRootDir = new File(getBaseDir(configuration), "flink-web-" + UUID.randomUUID());
        LOG.info("Using directory {} for the web interface files", this.webRootDir);
        boolean isProgramSubmitEnabled = this.cfg.isProgramSubmitEnabled();
        if (isProgramSubmitEnabled) {
            this.uploadDir = getUploadDir(configuration);
            if ((!this.uploadDir.exists() || !this.uploadDir.canWrite()) && (!this.uploadDir.mkdirs() || !this.uploadDir.canWrite())) {
                throw new IOException(String.format("Jar upload directory %s cannot be created or is not writable.", this.uploadDir.getAbsolutePath()));
            }
            LOG.info("Using directory {} for web frontend JAR file uploads", this.uploadDir);
        } else {
            this.uploadDir = null;
        }
        ExecutionGraphHolder executionGraphHolder = new ExecutionGraphHolder();
        this.stackTraceSamples = new StackTraceSampleCoordinator(actorSystem.dispatcher(), 60000L);
        int integer = configuration.getInteger("jobmanager.web.backpressure.cleanup-interval", 600000);
        int integer2 = configuration.getInteger("jobmanager.web.backpressure.refresh-interval", 60000);
        this.backPressureStatsTracker = new BackPressureStatsTracker(this.stackTraceSamples, integer, configuration.getInteger("jobmanager.web.backpressure.num-samples", 100), Time.milliseconds(configuration.getInteger("jobmanager.web.backpressure.delay-between-samples", 50)));
        this.executorService = new ForkJoinPool();
        ExecutionContextExecutor fromExecutor = ExecutionContext$.MODULE$.fromExecutor(this.executorService);
        boolean z = configuration.getBoolean("jobmanager.web.ssl.enabled", true) && SSLUtils.getSSLEnabled(configuration);
        if (z) {
            LOG.info("Enabling ssl for the web frontend");
            try {
                this.serverSSLContext = SSLUtils.createSSLServerContext(configuration);
            } catch (Exception e) {
                throw new IOException("Failed to initialize SSLContext for the web frontend", e);
            }
        } else {
            this.serverSSLContext = null;
        }
        this.metricFetcher = new MetricFetcher(actorSystem, this.retriever, fromExecutor);
        JobCancellationWithSavepointHandlers jobCancellationWithSavepointHandlers = new JobCancellationWithSavepointHandlers(executionGraphHolder, fromExecutor, configuration.getString("state.savepoints.dir", (String) null));
        RuntimeMonitorHandler handler = handler(jobCancellationWithSavepointHandlers.getTriggerHandler());
        RuntimeMonitorHandler handler2 = handler(jobCancellationWithSavepointHandlers.getInProgressHandler());
        Router router = new Router();
        GET(router, new DashboardConfigHandler(this.cfg.getRefreshInterval()));
        GET(router, new ClusterOverviewHandler(DEFAULT_REQUEST_TIMEOUT));
        GET(router, new JobManagerConfigHandler(configuration));
        GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, true));
        GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, true, false));
        GET(router, new CurrentJobsOverviewHandler(DEFAULT_REQUEST_TIMEOUT, false, true));
        GET(router, new CurrentJobIdsHandler(DEFAULT_REQUEST_TIMEOUT));
        GET(router, new JobDetailsHandler(executionGraphHolder, this.metricFetcher));
        GET(router, new JobVertexDetailsHandler(executionGraphHolder, this.metricFetcher));
        GET(router, new SubtasksTimesHandler(executionGraphHolder));
        GET(router, new JobVertexTaskManagersHandler(executionGraphHolder, this.metricFetcher));
        GET(router, new JobVertexAccumulatorsHandler(executionGraphHolder));
        GET(router, new JobVertexBackPressureHandler(executionGraphHolder, this.backPressureStatsTracker, integer2));
        GET(router, new JobVertexMetricsHandler(this.metricFetcher));
        GET(router, new SubtasksAllAccumulatorsHandler(executionGraphHolder));
        GET(router, new SubtaskCurrentAttemptDetailsHandler(executionGraphHolder, this.metricFetcher));
        GET(router, new SubtaskExecutionAttemptDetailsHandler(executionGraphHolder, this.metricFetcher));
        GET(router, new SubtaskExecutionAttemptAccumulatorsHandler(executionGraphHolder));
        GET(router, new JobPlanHandler(executionGraphHolder));
        GET(router, new JobConfigHandler(executionGraphHolder));
        GET(router, new JobExceptionsHandler(executionGraphHolder));
        GET(router, new JobAccumulatorsHandler(executionGraphHolder));
        GET(router, new JobMetricsHandler(this.metricFetcher));
        GET(router, new TaskManagersHandler(DEFAULT_REQUEST_TIMEOUT, this.metricFetcher));
        GET(router, new TaskManagerLogHandler(this.retriever, fromExecutor, this.jobManagerAddressPromise.future(), this.timeout, TaskManagerLogHandler.FileMode.LOG, configuration, z, blobView));
        GET(router, new TaskManagerLogHandler(this.retriever, fromExecutor, this.jobManagerAddressPromise.future(), this.timeout, TaskManagerLogHandler.FileMode.STDOUT, configuration, z, blobView));
        GET(router, new TaskManagerMetricsHandler(this.metricFetcher));
        router.GET("/jobmanager/log", find.logFile == null ? new ConstantTextHandler("(log file unavailable)") : new StaticFileServerHandler(this.retriever, this.jobManagerAddressPromise.future(), this.timeout, find.logFile, z)).GET("/jobmanager/stdout", find.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler(this.retriever, this.jobManagerAddressPromise.future(), this.timeout, find.stdOutFile, z));
        GET(router, new JobManagerMetricsHandler(this.metricFetcher));
        GET(router, new JobCancellationHandler());
        DELETE(router, new JobCancellationHandler());
        GET(router, handler);
        GET(router, handler2);
        GET(router, new JobStoppingHandler());
        DELETE(router, new JobStoppingHandler());
        CheckpointStatsCache checkpointStatsCache = new CheckpointStatsCache(configuration.getInteger("jobmanager.web.checkpoints.history", 10));
        GET(router, new CheckpointStatsHandler(executionGraphHolder));
        GET(router, new CheckpointConfigHandler(executionGraphHolder));
        GET(router, new CheckpointStatsDetailsHandler(executionGraphHolder, checkpointStatsCache));
        GET(router, new CheckpointStatsDetailsSubtasksHandler(executionGraphHolder, checkpointStatsCache));
        if (isProgramSubmitEnabled) {
            GET(router, new JarListHandler(this.uploadDir));
            GET(router, new JarPlanHandler(this.uploadDir));
            POST(router, new JarRunHandler(this.uploadDir, this.timeout, configuration));
            POST(router, new JarUploadHandler(this.uploadDir));
            DELETE(router, new JarDeleteHandler(this.uploadDir));
        } else {
            JarAccessDeniedHandler jarAccessDeniedHandler = new JarAccessDeniedHandler();
            GET(router, jarAccessDeniedHandler);
            POST(router, jarAccessDeniedHandler);
            DELETE(router, jarAccessDeniedHandler);
        }
        router.GET("/:*", new StaticFileServerHandler(this.retriever, this.jobManagerAddressPromise.future(), this.timeout, this.webRootDir, z));
        try {
            Runtime.getRuntime().addShutdownHook(new Thread() { // from class: org.apache.flink.runtime.webmonitor.WebRuntimeMonitor.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    WebRuntimeMonitor.this.cleanup();
                }
            });
        } catch (IllegalStateException e2) {
            LOG.debug("Unable to add shutdown hook, shutdown already in progress", e2);
        } catch (Throwable th) {
            LOG.warn("Error while adding shutdown hook", th);
        }
        this.netty = new WebFrontendBootstrap(router, LOG, this.uploadDir, this.serverSSLContext, webFrontendAddress, webFrontendPort, configuration);
    }

    public static JsonArchivist[] getJsonArchivists() {
        return new JsonArchivist[]{new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(), new JobPlanHandler.JobPlanJsonArchivist(), new JobConfigHandler.JobConfigJsonArchivist(), new JobExceptionsHandler.JobExceptionsJsonArchivist(), new JobDetailsHandler.JobDetailsJsonArchivist(), new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(), new CheckpointStatsHandler.CheckpointStatsJsonArchivist(), new CheckpointConfigHandler.CheckpointConfigJsonArchivist(), new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(), new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(), new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(), new SubtasksTimesHandler.SubtasksTimesJsonArchivist(), new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(), new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(), new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(), new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(), new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist()};
    }

    public void start(String str) throws Exception {
        LOG.info("Starting with JobManager {} on port {}", str, Integer.valueOf(getServerPort()));
        synchronized (this.startupShutdownLock) {
            this.jobManagerAddressPromise.success(str);
            this.leaderRetrievalService.start(this.retriever);
            long cleanUpInterval = this.backPressureStatsTracker.getCleanUpInterval();
            this.netty.getBootstrap().childGroup().scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flink.runtime.webmonitor.WebRuntimeMonitor.2
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        WebRuntimeMonitor.this.backPressureStatsTracker.cleanUpOperatorStatsCache();
                    } catch (Throwable th) {
                        WebRuntimeMonitor.LOG.error("Error during back pressure stats cache cleanup.", th);
                    }
                }
            }, cleanUpInterval, cleanUpInterval, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() throws Exception {
        synchronized (this.startupShutdownLock) {
            this.leaderRetrievalService.stop();
            this.netty.shutdown();
            this.stackTraceSamples.shutDown();
            this.backPressureStatsTracker.shutDown();
            this.executorService.shutdownNow();
            cleanup();
        }
    }

    public int getServerPort() {
        return this.netty.getServerPort();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void cleanup() {
        if (this.cleanedUp.compareAndSet(false, true)) {
            try {
                LOG.info("Removing web dashboard root cache directory {}", this.webRootDir);
                FileUtils.deleteDirectory(this.webRootDir);
            } catch (Throwable th) {
                LOG.warn("Error while deleting web root directory {}", this.webRootDir, th);
            }
            if (this.uploadDir != null) {
                try {
                    LOG.info("Removing web dashboard jar upload directory {}", this.uploadDir);
                    FileUtils.deleteDirectory(this.uploadDir);
                } catch (Throwable th2) {
                    LOG.warn("Error while deleting web storage dir {}", this.uploadDir, th2);
                }
            }
        }
    }

    private void GET(Router router, RequestHandler requestHandler) {
        GET(router, handler(requestHandler));
    }

    private void GET(Router router, RuntimeMonitorHandlerBase runtimeMonitorHandlerBase) {
        for (String str : runtimeMonitorHandlerBase.getPaths()) {
            router.GET(str, runtimeMonitorHandlerBase);
        }
    }

    private void DELETE(Router router, RequestHandler requestHandler) {
        DELETE(router, handler(requestHandler));
    }

    private void DELETE(Router router, RuntimeMonitorHandlerBase runtimeMonitorHandlerBase) {
        for (String str : runtimeMonitorHandlerBase.getPaths()) {
            router.DELETE(str, runtimeMonitorHandlerBase);
        }
    }

    private void POST(Router router, RequestHandler requestHandler) {
        POST(router, handler(requestHandler));
    }

    private void POST(Router router, RuntimeMonitorHandlerBase runtimeMonitorHandlerBase) {
        for (String str : runtimeMonitorHandlerBase.getPaths()) {
            router.POST(str, runtimeMonitorHandlerBase);
        }
    }

    private RuntimeMonitorHandler handler(RequestHandler requestHandler) {
        return new RuntimeMonitorHandler(this.cfg, requestHandler, this.retriever, this.jobManagerAddressPromise.future(), this.timeout, this.serverSSLContext != null);
    }

    File getBaseDir(Configuration configuration) {
        return new File(getBaseDirStr(configuration));
    }

    private String getBaseDirStr(Configuration configuration) {
        return configuration.getString("jobmanager.web.tmpdir", System.getProperty("java.io.tmpdir"));
    }

    private File getUploadDir(Configuration configuration) {
        File file = new File(configuration.getString("jobmanager.web.upload.dir", getBaseDirStr(configuration)));
        return configuration.containsKey("jobmanager.web.upload.dir") ? file : new File(file, "flink-web-" + UUID.randomUUID());
    }
}
