/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor;

import java.io.File;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

public class WebFrontendITCase
extends TestLogger {
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_SLOTS = 4;
    private static final Configuration CLUSTER_CONFIGURATION = WebFrontendITCase.getClusterConfiguration();
    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(CLUSTER_CONFIGURATION).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(4).build());

    private static Configuration getClusterConfiguration() {
        Configuration config = new Configuration();
        try {
            File logDir = File.createTempFile("TestBaseUtils-logdir", null);
            Assert.assertTrue((String)"Unable to delete temp file", (boolean)logDir.delete());
            Assert.assertTrue((String)"Unable to create temp directory", (boolean)logDir.mkdir());
            File logFile = new File(logDir, "jobmanager.log");
            File outFile = new File(logDir, "jobmanager.out");
            Files.createFile(logFile.toPath(), new FileAttribute[0]);
            Files.createFile(outFile.toPath(), new FileAttribute[0]);
            config.setString(WebOptions.LOG_PATH, logFile.getAbsolutePath());
            config.setString("taskmanager.log.path", logFile.getAbsolutePath());
        }
        catch (Exception e) {
            throw new AssertionError("Could not setup test.", e);
        }
        config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, (Object)MemorySize.parse((String)"12m"));
        return config;
    }

    @After
    public void tearDown() {
        BlockingInvokable.reset();
    }

    @Test
    public void getFrontPage() throws Exception {
        String fromHTTP = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/index.html"));
        MatcherAssert.assertThat((Object)fromHTTP, (Matcher)CoreMatchers.containsString((String)"Apache Flink Web Dashboard"));
    }

    private int getRestPort() {
        return CLUSTER.getRestAddres().getPort();
    }

    @Test
    public void testResponseHeaders() throws Exception {
        URL taskManagersUrl = new URL("http://localhost:" + this.getRestPort() + "/taskmanagers");
        HttpURLConnection taskManagerConnection = (HttpURLConnection)taskManagersUrl.openConnection();
        taskManagerConnection.setConnectTimeout(100000);
        taskManagerConnection.connect();
        if (taskManagerConnection.getResponseCode() >= 400) {
            InputStream is = taskManagerConnection.getErrorStream();
            String errorMessage = IOUtils.toString((InputStream)is, (Charset)ConfigConstants.DEFAULT_CHARSET);
            Assert.fail((String)errorMessage);
        }
        Assert.assertNull((Object)taskManagerConnection.getContentEncoding());
        Assert.assertEquals((Object)"application/json; charset=UTF-8", (Object)taskManagerConnection.getContentType());
        URL notFoundJobUrl = new URL("http://localhost:" + this.getRestPort() + "/jobs/dontexist");
        HttpURLConnection notFoundJobConnection = (HttpURLConnection)notFoundJobUrl.openConnection();
        notFoundJobConnection.setConnectTimeout(100000);
        notFoundJobConnection.connect();
        if (notFoundJobConnection.getResponseCode() >= 400) {
            Assert.assertNull((Object)notFoundJobConnection.getContentEncoding());
            Assert.assertEquals((Object)"application/json; charset=UTF-8", (Object)notFoundJobConnection.getContentType());
        } else {
            Assert.fail((String)"Request for non-existing job did not return an error.");
        }
    }

    @Test
    public void getNumberOfTaskManagers() throws Exception {
        String json = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/taskmanagers/"));
        ObjectMapper mapper = new ObjectMapper();
        JsonNode response = mapper.readTree(json);
        ArrayNode taskManagers = (ArrayNode)response.get("taskmanagers");
        Assert.assertNotNull((Object)taskManagers);
        Assert.assertEquals((long)2L, (long)taskManagers.size());
    }

    @Test
    public void getTaskManagers() throws Exception {
        String json = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/taskmanagers/"));
        ObjectMapper mapper = new ObjectMapper();
        JsonNode parsed = mapper.readTree(json);
        ArrayNode taskManagers = (ArrayNode)parsed.get("taskmanagers");
        Assert.assertNotNull((Object)taskManagers);
        Assert.assertEquals((long)2L, (long)taskManagers.size());
        JsonNode taskManager = taskManagers.get(0);
        Assert.assertNotNull((Object)taskManager);
        Assert.assertEquals((long)4L, (long)taskManager.get("slotsNumber").asInt());
        Assert.assertTrue((taskManager.get("freeSlots").asInt() <= 4 ? 1 : 0) != 0);
    }

    @Test
    public void getLogAndStdoutFiles() throws Exception {
        WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find((Configuration)CLUSTER_CONFIGURATION);
        FileUtils.writeStringToFile((File)logFiles.logFile, (String)"job manager log");
        String logs = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/jobmanager/log"));
        MatcherAssert.assertThat((Object)logs, (Matcher)CoreMatchers.containsString((String)"job manager log"));
        FileUtils.writeStringToFile((File)logFiles.stdOutFile, (String)"job manager out");
        logs = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/jobmanager/stdout"));
        MatcherAssert.assertThat((Object)logs, (Matcher)CoreMatchers.containsString((String)"job manager out"));
    }

    @Test
    public void getCustomLogFiles() throws Exception {
        WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find((Configuration)CLUSTER_CONFIGURATION);
        String customFileName = "test.log";
        String logDir = logFiles.logFile.getParent();
        String expectedLogContent = "job manager custom log";
        FileUtils.writeStringToFile((File)new File(logDir, customFileName), (String)"job manager custom log");
        String logs = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/jobmanager/logs/" + customFileName));
        MatcherAssert.assertThat((Object)logs, (Matcher)CoreMatchers.containsString((String)"job manager custom log"));
    }

    @Test
    public void getTaskManagerLogAndStdoutFiles() throws Exception {
        String json = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/taskmanagers/"));
        ObjectMapper mapper = new ObjectMapper();
        JsonNode parsed = mapper.readTree(json);
        ArrayNode taskManagers = (ArrayNode)parsed.get("taskmanagers");
        JsonNode taskManager = taskManagers.get(0);
        String id = taskManager.get("id").asText();
        WebMonitorUtils.LogFileLocation logFiles = WebMonitorUtils.LogFileLocation.find((Configuration)CLUSTER_CONFIGURATION);
        FileUtils.writeStringToFile((File)logFiles.logFile, (String)"job manager log");
        String logs = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/taskmanagers/" + id + "/log"));
        MatcherAssert.assertThat((Object)logs, (Matcher)CoreMatchers.containsString((String)"job manager log"));
        FileUtils.writeStringToFile((File)logFiles.stdOutFile, (String)"job manager out");
        logs = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/taskmanagers/" + id + "/stdout"));
        MatcherAssert.assertThat((Object)logs, (Matcher)CoreMatchers.containsString((String)"job manager out"));
    }

    @Test
    public void getConfiguration() throws Exception {
        String config = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/jobmanager/config"));
        Map conf = WebMonitorUtils.fromKeyValueJsonArray((String)config);
        MemorySize expected = (MemorySize)CLUSTER_CONFIGURATION.get(TaskManagerOptions.MANAGED_MEMORY_SIZE);
        MemorySize actual = MemorySize.parse((String)((String)conf.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())));
        Assert.assertEquals((Object)expected, (Object)actual);
    }

    @Test
    public void testCancel() throws Exception {
        Assert.assertTrue((boolean)WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
        JobVertex sender = new JobVertex("Sender");
        sender.setParallelism(2);
        sender.setInvokableClass(BlockingInvokable.class);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().setJobName("Stoppable streaming test job").addJobVertex(sender).build();
        JobID jid = jobGraph.getJobID();
        ClusterClient clusterClient = CLUSTER.getClusterClient();
        clusterClient.submitJob(jobGraph).get();
        while (WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(10L);
        }
        BlockingInvokable.latch.await();
        Duration testTimeout = Duration.ofMinutes(2L);
        Deadline deadline = Deadline.fromNow((Duration)testTimeout);
        try (HttpTestClient client = new HttpTestClient("localhost", this.getRestPort());){
            client.sendPatchRequest("/jobs/" + jid + "/", deadline.timeLeft());
            HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
            Assert.assertEquals((Object)HttpResponseStatus.ACCEPTED, (Object)response.getStatus());
            Assert.assertEquals((Object)"application/json; charset=UTF-8", (Object)response.getType());
            Assert.assertEquals((Object)"{}", (Object)response.getContent());
        }
        while (!WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(20L);
        }
        client = new HttpTestClient("localhost", this.getRestPort());
        var8_8 = null;
        try {
            Duration timeout = Duration.ofSeconds(30L);
            client.sendGetRequest("/jobs/" + jid + "/config", timeout);
            HttpTestClient.SimpleHttpResponse response = client.getNextResponse(timeout);
            Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
            Assert.assertEquals((Object)"application/json; charset=UTF-8", (Object)response.getType());
            Assert.assertEquals((Object)("{\"jid\":\"" + jid + "\",\"name\":\"Stoppable streaming test job\",\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\",\"job-parallelism\":1,\"object-reuse-mode\":false,\"user-config\":{}}}"), (Object)response.getContent());
        }
        catch (Throwable throwable) {
            var8_8 = throwable;
            throw throwable;
        }
        finally {
            if (client != null) {
                if (var8_8 != null) {
                    try {
                        client.close();
                    }
                    catch (Throwable throwable) {
                        var8_8.addSuppressed(throwable);
                    }
                } else {
                    client.close();
                }
            }
        }
        BlockingInvokable.reset();
    }

    @Test
    public void testJobOverviewHandler() throws Exception {
        Assert.assertTrue((boolean)WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
        JobVertex sender = new JobVertex("Sender");
        sender.setParallelism(2);
        sender.setInvokableClass(BlockingInvokable.class);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{sender});
        ClusterClient clusterClient = CLUSTER.getClusterClient();
        clusterClient.submitJob(jobGraph).get();
        while (WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(10L);
        }
        BlockingInvokable.latch.await();
        Duration testTimeout = Duration.ofMinutes(2L);
        String json = TestBaseUtils.getFromHTTP((String)("http://localhost:" + this.getRestPort() + "/jobs/overview"));
        ObjectMapper mapper = new ObjectMapper();
        JsonNode parsed = mapper.readTree(json);
        ArrayNode jsonJobs = (ArrayNode)parsed.get("jobs");
        Assert.assertEquals((long)1L, (long)jsonJobs.size());
        MatcherAssert.assertThat((String)"Duration must be positive", (jsonJobs.get(0).get("duration").asInt() > 0 ? 1 : 0) != 0);
        clusterClient.cancel(jobGraph.getJobID()).get();
        while (!WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(20L);
        }
        BlockingInvokable.reset();
    }

    @Test
    public void testCancelYarn() throws Exception {
        Assert.assertTrue((boolean)WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
        JobVertex sender = new JobVertex("Sender");
        sender.setParallelism(2);
        sender.setInvokableClass(BlockingInvokable.class);
        JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph((JobVertex[])new JobVertex[]{sender});
        JobID jid = jobGraph.getJobID();
        ClusterClient clusterClient = CLUSTER.getClusterClient();
        clusterClient.submitJob(jobGraph).get();
        while (WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(10L);
        }
        BlockingInvokable.latch.await();
        Duration testTimeout = Duration.ofMinutes(2L);
        Deadline deadline = Deadline.fromNow((Duration)testTimeout);
        try (HttpTestClient client = new HttpTestClient("localhost", this.getRestPort());){
            client.sendGetRequest("/jobs/" + jid + "/yarn-cancel", deadline.timeLeft());
            HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
            Assert.assertEquals((Object)HttpResponseStatus.ACCEPTED, (Object)response.getStatus());
            Assert.assertEquals((Object)"application/json; charset=UTF-8", (Object)response.getType());
            Assert.assertEquals((Object)"{}", (Object)response.getContent());
        }
        while (!WebFrontendITCase.getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(20L);
        }
        BlockingInvokable.reset();
    }

    private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
        Collection statusMessages = (Collection)client.listJobs().get();
        return statusMessages.stream().filter(status -> !status.getJobState().isGloballyTerminalState()).map(JobStatusMessage::getJobId).collect(Collectors.toList());
    }

    public static class BlockingInvokable
    extends AbstractInvokable {
        private static CountDownLatch latch = new CountDownLatch(2);
        private volatile boolean isRunning = true;
        private final CompletableFuture<Void> terminationFuture = new CompletableFuture();

        public BlockingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            latch.countDown();
            try {
                while (this.isRunning) {
                    Thread.sleep(100L);
                }
            }
            finally {
                this.terminationFuture.complete(null);
            }
        }

        public Future<Void> cancel() {
            this.isRunning = false;
            return this.terminationFuture;
        }

        public static void reset() {
            latch = new CountDownLatch(2);
        }
    }
}

