package org.apache.flink.runtime.webmonitor;

import java.io.File;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/WebFrontendITCase.class */
public class WebFrontendITCase extends TestLogger {
    private static final Configuration CLUSTER_CONFIGURATION = getClusterConfiguration();
    private static final int NUM_TASK_MANAGERS = 2;
    private static final int NUM_SLOTS = 4;

    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(CLUSTER_CONFIGURATION).setNumberTaskManagers(NUM_TASK_MANAGERS).setNumberSlotsPerTaskManager(NUM_SLOTS).build());

    /* loaded from: input_file:org/apache/flink/runtime/webmonitor/WebFrontendITCase$BlockingInvokable.class */
    public static class BlockingInvokable extends AbstractInvokable {
        private static CountDownLatch latch = new CountDownLatch(WebFrontendITCase.NUM_TASK_MANAGERS);
        private volatile boolean isRunning;

        public BlockingInvokable(Environment environment) {
            super(environment);
            this.isRunning = true;
        }

        public void invoke() throws Exception {
            latch.countDown();
            while (this.isRunning) {
                Thread.sleep(100L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }

        public static void reset() {
            latch = new CountDownLatch(WebFrontendITCase.NUM_TASK_MANAGERS);
        }
    }

    private static Configuration getClusterConfiguration() {
        Configuration configuration = new Configuration();
        try {
            File createTempFile = File.createTempFile("TestBaseUtils-logdir", null);
            Assert.assertTrue("Unable to delete temp file", createTempFile.delete());
            Assert.assertTrue("Unable to create temp directory", createTempFile.mkdir());
            File file = new File(createTempFile, "jobmanager.log");
            File file2 = new File(createTempFile, "jobmanager.out");
            Files.createFile(file.toPath(), new FileAttribute[0]);
            Files.createFile(file2.toPath(), new FileAttribute[0]);
            configuration.setString(WebOptions.LOG_PATH, file.getAbsolutePath());
            configuration.setString("taskmanager.log.path", file.getAbsolutePath());
            configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("12m"));
            return configuration;
        } catch (Exception e) {
            throw new AssertionError("Could not setup test.", e);
        }
    }

    @After
    public void tearDown() {
        BlockingInvokable.reset();
    }

    @Test
    public void getFrontPage() throws Exception {
        MatcherAssert.assertThat(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/index.html"), CoreMatchers.containsString("Apache Flink Web Dashboard"));
    }

    private int getRestPort() {
        return CLUSTER.getRestAddres().getPort();
    }

    @Test
    public void testResponseHeaders() throws Exception {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://localhost:" + getRestPort() + "/taskmanagers").openConnection();
        httpURLConnection.setConnectTimeout(100000);
        httpURLConnection.connect();
        if (httpURLConnection.getResponseCode() >= 400) {
            Assert.fail(IOUtils.toString(httpURLConnection.getErrorStream(), ConfigConstants.DEFAULT_CHARSET));
        }
        Assert.assertNull(httpURLConnection.getContentEncoding());
        Assert.assertEquals("application/json; charset=UTF-8", httpURLConnection.getContentType());
        HttpURLConnection httpURLConnection2 = (HttpURLConnection) new URL("http://localhost:" + getRestPort() + "/jobs/dontexist").openConnection();
        httpURLConnection2.setConnectTimeout(100000);
        httpURLConnection2.connect();
        if (httpURLConnection2.getResponseCode() < 400) {
            Assert.fail("Request for non-existing job did not return an error.");
        } else {
            Assert.assertNull(httpURLConnection2.getContentEncoding());
            Assert.assertEquals("application/json; charset=UTF-8", httpURLConnection2.getContentType());
        }
    }

    @Test
    public void getNumberOfTaskManagers() throws Exception {
        Assert.assertNotNull(new ObjectMapper().readTree(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/")).get("taskmanagers"));
        Assert.assertEquals(2L, r0.size());
    }

    @Test
    public void getTaskManagers() throws Exception {
        ArrayNode arrayNode = new ObjectMapper().readTree(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/")).get("taskmanagers");
        Assert.assertNotNull(arrayNode);
        Assert.assertEquals(2L, arrayNode.size());
        JsonNode jsonNode = arrayNode.get(0);
        Assert.assertNotNull(jsonNode);
        Assert.assertEquals(4L, jsonNode.get("slotsNumber").asInt());
        Assert.assertTrue(jsonNode.get("freeSlots").asInt() <= NUM_SLOTS);
    }

    @Test
    public void getLogAndStdoutFiles() throws Exception {
        WebMonitorUtils.LogFileLocation find = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
        FileUtils.writeStringToFile(find.logFile, "job manager log");
        MatcherAssert.assertThat(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/log"), CoreMatchers.containsString("job manager log"));
        FileUtils.writeStringToFile(find.stdOutFile, "job manager out");
        MatcherAssert.assertThat(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/stdout"), CoreMatchers.containsString("job manager out"));
    }

    @Test
    public void getCustomLogFiles() throws Exception {
        FileUtils.writeStringToFile(new File(WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION).logFile.getParent(), "test.log"), "job manager custom log");
        MatcherAssert.assertThat(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/logs/test.log"), CoreMatchers.containsString("job manager custom log"));
    }

    @Test
    public void getTaskManagerLogAndStdoutFiles() throws Exception {
        String asText = new ObjectMapper().readTree(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/")).get("taskmanagers").get(0).get("id").asText();
        WebMonitorUtils.LogFileLocation find = WebMonitorUtils.LogFileLocation.find(CLUSTER_CONFIGURATION);
        FileUtils.writeStringToFile(find.logFile, "job manager log");
        MatcherAssert.assertThat(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/" + asText + "/log"), CoreMatchers.containsString("job manager log"));
        FileUtils.writeStringToFile(find.stdOutFile, "job manager out");
        MatcherAssert.assertThat(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/taskmanagers/" + asText + "/stdout"), CoreMatchers.containsString("job manager out"));
    }

    @Test
    public void getConfiguration() throws Exception {
        Assert.assertEquals((MemorySize) CLUSTER_CONFIGURATION.get(TaskManagerOptions.MANAGED_MEMORY_SIZE), MemorySize.parse((String) WebMonitorUtils.fromKeyValueJsonArray(TestBaseUtils.getFromHTTP("http://localhost:" + getRestPort() + "/jobmanager/config")).get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())));
    }

    @Test
    public void testCancel() throws Exception {
        Assert.assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setParallelism(NUM_TASK_MANAGERS);
        jobVertex.setInvokableClass(BlockingInvokable.class);
        JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new JobVertex[]{jobVertex});
        JobID jobID = jobGraph.getJobID();
        ClientUtils.submitJob(CLUSTER.getClusterClient(), jobGraph);
        while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(10L);
        }
        BlockingInvokable.latch.await();
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(2L));
        HttpTestClient httpTestClient = new HttpTestClient("localhost", getRestPort());
        Throwable th = null;
        try {
            try {
                httpTestClient.sendPatchRequest("/jobs/" + jobID + "/", fromNow.timeLeft());
                HttpTestClient.SimpleHttpResponse nextResponse = httpTestClient.getNextResponse(fromNow.timeLeft());
                Assert.assertEquals(HttpResponseStatus.ACCEPTED, nextResponse.getStatus());
                Assert.assertEquals("application/json; charset=UTF-8", nextResponse.getType());
                Assert.assertEquals("{}", nextResponse.getContent());
                if (httpTestClient != null) {
                    if (0 != 0) {
                        try {
                            httpTestClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        httpTestClient.close();
                    }
                }
                while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
                    Thread.sleep(20L);
                }
                HttpTestClient httpTestClient2 = new HttpTestClient("localhost", getRestPort());
                Throwable th3 = null;
                try {
                    Duration ofSeconds = Duration.ofSeconds(30L);
                    httpTestClient2.sendGetRequest("/jobs/" + jobID + "/config", ofSeconds);
                    HttpTestClient.SimpleHttpResponse nextResponse2 = httpTestClient2.getNextResponse(ofSeconds);
                    Assert.assertEquals(HttpResponseStatus.OK, nextResponse2.getStatus());
                    Assert.assertEquals("application/json; charset=UTF-8", nextResponse2.getType());
                    Assert.assertEquals("{\"jid\":\"" + jobID + "\",\"name\":\"Stoppable streaming test job\",\"execution-config\":{\"execution-mode\":\"PIPELINED\",\"restart-strategy\":\"Cluster level default restart strategy\",\"job-parallelism\":1,\"object-reuse-mode\":false,\"user-config\":{}}}", nextResponse2.getContent());
                    if (httpTestClient2 != null) {
                        if (0 != 0) {
                            try {
                                httpTestClient2.close();
                            } catch (Throwable th4) {
                                th3.addSuppressed(th4);
                            }
                        } else {
                            httpTestClient2.close();
                        }
                    }
                    BlockingInvokable.reset();
                } catch (Throwable th5) {
                    if (httpTestClient2 != null) {
                        if (0 != 0) {
                            try {
                                httpTestClient2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            httpTestClient2.close();
                        }
                    }
                    throw th5;
                }
            } finally {
            }
        } catch (Throwable th7) {
            if (httpTestClient != null) {
                if (th != null) {
                    try {
                        httpTestClient.close();
                    } catch (Throwable th8) {
                        th.addSuppressed(th8);
                    }
                } else {
                    httpTestClient.close();
                }
            }
            throw th7;
        }
    }

    @Test
    public void testCancelYarn() throws Exception {
        Assert.assertTrue(getRunningJobs(CLUSTER.getClusterClient()).isEmpty());
        JobVertex jobVertex = new JobVertex("Sender");
        jobVertex.setParallelism(NUM_TASK_MANAGERS);
        jobVertex.setInvokableClass(BlockingInvokable.class);
        JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new JobVertex[]{jobVertex});
        JobID jobID = jobGraph.getJobID();
        ClientUtils.submitJob(CLUSTER.getClusterClient(), jobGraph);
        while (getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
            Thread.sleep(10L);
        }
        BlockingInvokable.latch.await();
        Deadline fromNow = Deadline.fromNow(Duration.ofMinutes(2L));
        HttpTestClient httpTestClient = new HttpTestClient("localhost", getRestPort());
        Throwable th = null;
        try {
            try {
                httpTestClient.sendGetRequest("/jobs/" + jobID + "/yarn-cancel", fromNow.timeLeft());
                HttpTestClient.SimpleHttpResponse nextResponse = httpTestClient.getNextResponse(fromNow.timeLeft());
                Assert.assertEquals(HttpResponseStatus.ACCEPTED, nextResponse.getStatus());
                Assert.assertEquals("application/json; charset=UTF-8", nextResponse.getType());
                Assert.assertEquals("{}", nextResponse.getContent());
                if (httpTestClient != null) {
                    if (0 != 0) {
                        try {
                            httpTestClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        httpTestClient.close();
                    }
                }
                while (!getRunningJobs(CLUSTER.getClusterClient()).isEmpty()) {
                    Thread.sleep(20L);
                }
                BlockingInvokable.reset();
            } finally {
            }
        } catch (Throwable th3) {
            if (httpTestClient != null) {
                if (th != null) {
                    try {
                        httpTestClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    httpTestClient.close();
                }
            }
            throw th3;
        }
    }

    private static List<JobID> getRunningJobs(ClusterClient<?> clusterClient) throws Exception {
        return (List) ((Collection) clusterClient.listJobs().get()).stream().filter(jobStatusMessage -> {
            return !jobStatusMessage.getJobState().isGloballyTerminalState();
        }).map((v0) -> {
            return v0.getJobId();
        }).collect(Collectors.toList());
    }
}
