package org.apache.flink.runtime.webmonitor;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.io.network.netty.SSLHandlerFactory;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.net.SSLUtils;
import org.apache.flink.runtime.rest.handler.WebHandler;
import org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointStatsCache;
import org.apache.flink.runtime.rest.handler.legacy.ClusterConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ClusterOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.ConstantTextHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers;
import org.apache.flink.runtime.rest.handler.legacy.JobConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobExceptionsHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobPlanHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobStoppingHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobVertexAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobVertexBackPressureHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobVertexDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobVertexTaskManagersHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.RequestHandler;
import org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.legacy.SubtaskExecutionAttemptDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.SubtasksAllAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.legacy.SubtasksTimesHandler;
import org.apache.flink.runtime.rest.handler.legacy.TaskManagerLogHandler;
import org.apache.flink.runtime.rest.handler.legacy.TaskManagersHandler;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTrackerImpl;
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsDetailsSubtasksHandler;
import org.apache.flink.runtime.rest.handler.legacy.checkpoints.CheckpointStatsHandler;
import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingJobsMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingSubtasksMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.AggregatingTaskManagersMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.JobManagerMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.JobMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.JobVertexMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.SubtaskMetricsHandler;
import org.apache.flink.runtime.rest.handler.legacy.metrics.TaskManagerMetricsHandler;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarAccessDeniedHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarDeleteHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarListHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarPlanHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarRunHandler;
import org.apache.flink.runtime.webmonitor.handlers.legacy.JarUploadHandler;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.class */
public class WebRuntimeMonitor implements WebMonitor {
    public static final Time DEFAULT_REQUEST_TIMEOUT = Time.seconds(10);
    private static final Logger LOG = LoggerFactory.getLogger(WebRuntimeMonitor.class);
    private final LeaderRetrievalService leaderRetrievalService;
    private final LeaderGatewayRetriever<JobManagerGateway> retriever;
    private final Time timeout;
    private final WebFrontendBootstrap netty;
    private final File webRootDir;
    private final File uploadDir;
    private final StackTraceSampleCoordinator stackTraceSamples;
    private final BackPressureStatsTrackerImpl backPressureStatsTrackerImpl;
    private final WebMonitorConfig cfg;
    private final ExecutionGraphCache executionGraphCache;
    private final ScheduledFuture<?> executionGraphCleanupTask;
    private MetricFetcher metricFetcher;
    private final Object startupShutdownLock = new Object();
    private final CompletableFuture<String> localRestAddress = new CompletableFuture<>();
    private AtomicBoolean cleanedUp = new AtomicBoolean();

    public WebRuntimeMonitor(Configuration configuration, LeaderRetrievalService leaderRetrievalService, LeaderGatewayRetriever<JobManagerGateway> leaderGatewayRetriever, MetricQueryServiceRetriever metricQueryServiceRetriever, Time time, ScheduledExecutor scheduledExecutor) throws IOException, InterruptedException {
        SSLHandlerFactory sSLHandlerFactory;
        this.leaderRetrievalService = (LeaderRetrievalService) Preconditions.checkNotNull(leaderRetrievalService);
        this.retriever = (LeaderGatewayRetriever) Preconditions.checkNotNull(leaderGatewayRetriever);
        this.timeout = (Time) Preconditions.checkNotNull(time);
        this.cfg = new WebMonitorConfig(configuration);
        String webFrontendAddress = this.cfg.getWebFrontendAddress();
        int webFrontendPort = this.cfg.getWebFrontendPort();
        if (webFrontendPort < 0) {
            throw new IllegalArgumentException("Web frontend port is invalid: " + webFrontendPort);
        }
        WebMonitorUtils.LogFileLocation find = WebMonitorUtils.LogFileLocation.find(configuration);
        this.webRootDir = new File(getBaseDir(configuration), "flink-web-" + UUID.randomUUID());
        LOG.info("Using directory {} for the web interface files", this.webRootDir);
        boolean isProgramSubmitEnabled = this.cfg.isProgramSubmitEnabled();
        if (isProgramSubmitEnabled) {
            this.uploadDir = getUploadDir(configuration);
            checkAndCreateUploadDir(this.uploadDir);
        } else {
            this.uploadDir = null;
        }
        long refreshInterval = this.cfg.getRefreshInterval() * 10;
        this.executionGraphCache = new ExecutionGraphCache(time, Time.milliseconds(refreshInterval));
        long j = refreshInterval * 2;
        ExecutionGraphCache executionGraphCache = this.executionGraphCache;
        executionGraphCache.getClass();
        this.executionGraphCleanupTask = scheduledExecutor.scheduleWithFixedDelay(executionGraphCache::cleanup, j, j, TimeUnit.MILLISECONDS);
        this.stackTraceSamples = new StackTraceSampleCoordinator(scheduledExecutor, 60000L);
        int integer = configuration.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
        int integer2 = configuration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL);
        this.backPressureStatsTrackerImpl = new BackPressureStatsTrackerImpl(this.stackTraceSamples, integer, configuration.getInteger(WebOptions.BACKPRESSURE_NUM_SAMPLES), configuration.getInteger(WebOptions.BACKPRESSURE_REFRESH_INTERVAL), Time.milliseconds(configuration.getInteger(WebOptions.BACKPRESSURE_DELAY)));
        if (SSLUtils.isRestSSLEnabled(configuration) && configuration.getBoolean(WebOptions.SSL_ENABLED)) {
            LOG.info("Enabling ssl for the web frontend");
            try {
                sSLHandlerFactory = SSLUtils.createRestServerSSLEngineFactory(configuration);
            } catch (Exception e) {
                throw new IOException("Failed to initialize SSLContext for the web frontend", e);
            }
        } else {
            sSLHandlerFactory = null;
        }
        this.metricFetcher = new MetricFetcher(this.retriever, metricQueryServiceRetriever, scheduledExecutor, time);
        JobCancellationWithSavepointHandlers jobCancellationWithSavepointHandlers = new JobCancellationWithSavepointHandlers(this.executionGraphCache, scheduledExecutor, configuration.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY));
        RuntimeMonitorHandler handler = handler(jobCancellationWithSavepointHandlers.getTriggerHandler());
        RuntimeMonitorHandler handler2 = handler(jobCancellationWithSavepointHandlers.getInProgressHandler());
        Router router = new Router();
        get(router, (RequestHandler) new DashboardConfigHandler(scheduledExecutor, this.cfg.getRefreshInterval()));
        get(router, (RequestHandler) new ClusterOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));
        get(router, (RequestHandler) new ClusterConfigHandler(scheduledExecutor, configuration));
        get(router, (RequestHandler) new JobManagerMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new AggregatingTaskManagersMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new TaskManagerMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new JobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));
        get(router, (RequestHandler) new CurrentJobIdsHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));
        get(router, (RequestHandler) new JobDetailsHandler(this.executionGraphCache, scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new AggregatingJobsMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new JobMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new JobVertexMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new AggregatingSubtasksMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new SubtaskMetricsHandler(scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new JobVertexDetailsHandler(this.executionGraphCache, scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new SubtasksTimesHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new JobVertexTaskManagersHandler(this.executionGraphCache, scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new JobVertexAccumulatorsHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new JobVertexBackPressureHandler(this.executionGraphCache, scheduledExecutor, this.backPressureStatsTrackerImpl, integer2));
        get(router, (RequestHandler) new SubtasksAllAccumulatorsHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new SubtaskCurrentAttemptDetailsHandler(this.executionGraphCache, scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new SubtaskExecutionAttemptDetailsHandler(this.executionGraphCache, scheduledExecutor, this.metricFetcher));
        get(router, (RequestHandler) new SubtaskExecutionAttemptAccumulatorsHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new JobPlanHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new JobConfigHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new JobExceptionsHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new JobAccumulatorsHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, this.metricFetcher));
        get(router, new TaskManagerLogHandler(this.retriever, scheduledExecutor, this.localRestAddress, time, TaskManagerLogHandler.FileMode.LOG, configuration));
        get(router, new TaskManagerLogHandler(this.retriever, scheduledExecutor, this.localRestAddress, time, TaskManagerLogHandler.FileMode.STDOUT, configuration));
        router.addGet("/jobmanager/log", find.logFile == null ? new ConstantTextHandler("(log file unavailable)") : new StaticFileServerHandler(this.retriever, this.localRestAddress, time, find.logFile)).addGet("/jobmanager/stdout", find.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler(this.retriever, this.localRestAddress, time, find.stdOutFile));
        get(router, (RequestHandler) new JobCancellationHandler(scheduledExecutor, time));
        delete(router, (RequestHandler) new JobCancellationHandler(scheduledExecutor, time));
        get(router, handler);
        get(router, handler2);
        get(router, (RequestHandler) new JobStoppingHandler(scheduledExecutor, time));
        delete(router, (RequestHandler) new JobStoppingHandler(scheduledExecutor, time));
        CheckpointStatsCache checkpointStatsCache = new CheckpointStatsCache(configuration.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE));
        get(router, (RequestHandler) new CheckpointStatsHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new CheckpointConfigHandler(this.executionGraphCache, scheduledExecutor));
        get(router, (RequestHandler) new CheckpointStatsDetailsHandler(this.executionGraphCache, scheduledExecutor, checkpointStatsCache));
        get(router, (RequestHandler) new CheckpointStatsDetailsSubtasksHandler(this.executionGraphCache, scheduledExecutor, checkpointStatsCache));
        if (isProgramSubmitEnabled) {
            get(router, (RequestHandler) new JarListHandler(scheduledExecutor, this.uploadDir));
            get(router, (RequestHandler) new JarPlanHandler(scheduledExecutor, this.uploadDir));
            post(router, (RequestHandler) new JarRunHandler(scheduledExecutor, this.uploadDir, time, configuration));
            post(router, (RequestHandler) new JarUploadHandler(scheduledExecutor, this.uploadDir));
            delete(router, (RequestHandler) new JarDeleteHandler(scheduledExecutor, this.uploadDir));
        } else {
            JarAccessDeniedHandler jarAccessDeniedHandler = new JarAccessDeniedHandler(scheduledExecutor);
            get(router, (RequestHandler) jarAccessDeniedHandler);
            post(router, (RequestHandler) jarAccessDeniedHandler);
            delete(router, (RequestHandler) jarAccessDeniedHandler);
        }
        router.addGet("/:*", new StaticFileServerHandler(this.retriever, this.localRestAddress, time, this.webRootDir));
        ShutdownHookUtil.addShutdownHook(this::cleanup, getClass().getSimpleName(), LOG);
        this.netty = new WebFrontendBootstrap(router, LOG, this.uploadDir, sSLHandlerFactory, webFrontendAddress, webFrontendPort, configuration);
        this.localRestAddress.complete(this.netty.getRestAddress());
    }

    public static JsonArchivist[] getJsonArchivists() {
        return new JsonArchivist[]{new JobsOverviewHandler.CurrentJobsOverviewJsonArchivist(), new JobPlanHandler.JobPlanJsonArchivist(), new JobConfigHandler.JobConfigJsonArchivist(), new JobExceptionsHandler.JobExceptionsJsonArchivist(), new JobDetailsHandler.JobDetailsJsonArchivist(), new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(), new CheckpointStatsHandler.CheckpointStatsJsonArchivist(), new CheckpointConfigHandler.CheckpointConfigJsonArchivist(), new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(), new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(), new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(), new SubtasksTimesHandler.SubtasksTimesJsonArchivist(), new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(), new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(), new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(), new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(), new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist()};
    }

    public void start() throws Exception {
        synchronized (this.startupShutdownLock) {
            this.leaderRetrievalService.start(this.retriever);
            long cleanUpInterval = this.backPressureStatsTrackerImpl.getCleanUpInterval();
            this.netty.getBootstrap().childGroup().scheduleWithFixedDelay(new Runnable() { // from class: org.apache.flink.runtime.webmonitor.WebRuntimeMonitor.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        WebRuntimeMonitor.this.backPressureStatsTrackerImpl.cleanUpOperatorStatsCache();
                    } catch (Throwable th) {
                        WebRuntimeMonitor.LOG.error("Error during back pressure stats cache cleanup.", th);
                    }
                }
            }, cleanUpInterval, cleanUpInterval, TimeUnit.MILLISECONDS);
        }
    }

    public void stop() throws Exception {
        synchronized (this.startupShutdownLock) {
            this.executionGraphCleanupTask.cancel(false);
            this.executionGraphCache.close();
            this.leaderRetrievalService.stop();
            this.netty.shutdown();
            this.stackTraceSamples.shutDown();
            this.backPressureStatsTrackerImpl.shutDown();
            cleanup();
        }
    }

    public int getServerPort() {
        return this.netty.getServerPort();
    }

    public String getRestAddress() {
        return this.netty.getRestAddress();
    }

    private void cleanup() {
        if (this.cleanedUp.compareAndSet(false, true)) {
            try {
                LOG.info("Removing web dashboard root cache directory {}", this.webRootDir);
                FileUtils.deleteDirectory(this.webRootDir);
            } catch (Throwable th) {
                LOG.warn("Error while deleting web root directory {}", this.webRootDir, th);
            }
            if (this.uploadDir != null) {
                try {
                    LOG.info("Removing web dashboard jar upload directory {}", this.uploadDir);
                    FileUtils.deleteDirectory(this.uploadDir);
                } catch (Throwable th2) {
                    LOG.warn("Error while deleting web storage dir {}", this.uploadDir, th2);
                }
            }
        }
    }

    private void get(Router router, RequestHandler requestHandler) {
        get(router, handler(requestHandler));
    }

    private static <T extends ChannelInboundHandler & WebHandler> void get(Router router, T t) {
        for (String str : t.getPaths()) {
            router.addGet(str, t);
        }
    }

    private void delete(Router router, RequestHandler requestHandler) {
        delete(router, handler(requestHandler));
    }

    private static <T extends ChannelInboundHandler & WebHandler> void delete(Router router, T t) {
        for (String str : t.getPaths()) {
            router.addDelete(str, t);
        }
    }

    private void post(Router router, RequestHandler requestHandler) {
        post(router, handler(requestHandler));
    }

    private static <T extends ChannelInboundHandler & WebHandler> void post(Router router, T t) {
        for (String str : t.getPaths()) {
            router.addPost(str, t);
        }
    }

    private RuntimeMonitorHandler handler(RequestHandler requestHandler) {
        return new RuntimeMonitorHandler(this.cfg, requestHandler, this.retriever, this.localRestAddress, this.timeout);
    }

    File getBaseDir(Configuration configuration) {
        return new File(getBaseDirStr(configuration));
    }

    private String getBaseDirStr(Configuration configuration) {
        return configuration.getString(WebOptions.TMP_DIR);
    }

    private File getUploadDir(Configuration configuration) {
        File file = new File(configuration.getString(WebOptions.UPLOAD_DIR, getBaseDirStr(configuration)));
        return configuration.contains(WebOptions.UPLOAD_DIR) ? file : new File(file, "flink-web-" + UUID.randomUUID());
    }

    public static void logExternalUploadDirDeletion(File file) {
        LOG.warn("Jar storage directory {} has been deleted externally. Previously uploaded jars are no longer available.", file.getAbsolutePath());
    }

    public static synchronized void checkAndCreateUploadDir(File file) throws IOException {
        if (file.exists() && file.canWrite()) {
            LOG.info("Using directory {} for web frontend JAR file uploads.", file);
        } else if (file.mkdirs() && file.canWrite()) {
            LOG.info("Created directory {} for web frontend JAR file uploads.", file);
        } else {
            LOG.warn("Jar upload directory {} cannot be created or is not writable.", file.getAbsolutePath());
            throw new IOException(String.format("Jar upload directory %s cannot be created or is not writable.", file.getAbsolutePath()));
        }
    }
}
