/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import io.netty.handler.codec.http.HttpResponseStatus;
import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Scanner;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.leaderelection.TestingListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.JobManagerRetriever;
import org.apache.flink.runtime.webmonitor.WebRuntimeMonitor;
import org.apache.flink.runtime.webmonitor.files.MimeTypes;
import org.apache.flink.runtime.webmonitor.testutils.HttpTestClient;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import org.powermock.reflect.Whitebox;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.collection.Seq;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

public class WebRuntimeMonitorITCase
extends TestLogger {
    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final FiniteDuration TestTimeout = new FiniteDuration(2L, TimeUnit.MINUTES);
    private final String MAIN_RESOURCES_PATH = ((Object)((Object)this)).getClass().getResource("/web").getPath();

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testStandaloneWebRuntimeMonitor() throws Exception {
        Deadline deadline = TestTimeout.fromNow();
        TestingCluster flink = null;
        WebRuntimeMonitor webMonitor = null;
        try {
            flink = new TestingCluster(new Configuration());
            flink.start(true);
            ActorSystem jmActorSystem = (ActorSystem)((Seq)flink.jobManagerActorSystems().get()).head();
            ActorRef jmActor = (ActorRef)((Seq)flink.jobManagerActors().get()).head();
            File logDir = this.temporaryFolder.newFolder("log");
            Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath(), new FileAttribute[0]);
            Files.createFile(new File(logDir, "jobmanager.out").toPath(), new FileAttribute[0]);
            Configuration monitorConfig = new Configuration();
            monitorConfig.setInteger("jobmanager.web.port", 0);
            monitorConfig.setString("jobmanager.web.log.path", logFile.toString());
            String jobManagerAddress = AkkaUtils.getAkkaURL((ActorSystem)jmActorSystem, (ActorRef)jmActor);
            webMonitor = new WebRuntimeMonitor(monitorConfig, flink.createLeaderRetrievalService(), jmActorSystem);
            webMonitor.start(jobManagerAddress);
            try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort());){
                String expected = new Scanner(new File(this.MAIN_RESOURCES_PATH + "/index.html")).useDelimiter("\\A").next();
                client.sendGetRequest("index.html", deadline.timeLeft());
                HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
                Assert.assertEquals((Object)response.getType(), (Object)MimeTypes.getMimeTypeForExtension((String)"html"));
                Assert.assertEquals((Object)expected, (Object)response.getContent());
                client.sendGetRequest("/overview", deadline.timeLeft());
                response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
                Assert.assertEquals((Object)response.getType(), (Object)MimeTypes.getMimeTypeForExtension((String)"json"));
                Assert.assertTrue((boolean)response.getContent().contains("\"taskmanagers\":1"));
            }
        }
        finally {
            if (flink != null) {
                flink.shutdown();
            }
            if (webMonitor != null) {
                webMonitor.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testRedirectToLeader() throws Exception {
        Deadline deadline = TestTimeout.fromNow();
        ActorSystem[] jobManagerSystem = new ActorSystem[2];
        WebRuntimeMonitor[] webMonitor = new WebRuntimeMonitor[2];
        ArrayList<ZooKeeperLeaderRetrievalService> leaderRetrievalServices = new ArrayList<ZooKeeperLeaderRetrievalService>();
        try {
            Throwable throwable = null;
            try (TestingServer zooKeeper = new TestingServer();){
                int i;
                Configuration config = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig((String)zooKeeper.getConnectString(), (String)this.temporaryFolder.getRoot().getPath());
                File file = this.temporaryFolder.newFolder();
                Path logFile = Files.createFile(new File(file, "jobmanager.log").toPath(), new FileAttribute[0]);
                Files.createFile(new File(file, "jobmanager.out").toPath(), new FileAttribute[0]);
                config.setInteger("jobmanager.web.port", 0);
                config.setString("jobmanager.web.log.path", logFile.toString());
                for (i = 0; i < jobManagerSystem.length; ++i) {
                    jobManagerSystem[i] = AkkaUtils.createActorSystem((Configuration)new Configuration(), (Option)new Some((Object)new Tuple2((Object)"localhost", (Object)0)));
                }
                for (i = 0; i < webMonitor.length; ++i) {
                    ZooKeeperLeaderRetrievalService lrs = ZooKeeperUtils.createLeaderRetrievalService((Configuration)config);
                    leaderRetrievalServices.add(lrs);
                    webMonitor[i] = new WebRuntimeMonitor(config, (LeaderRetrievalService)lrs, jobManagerSystem[i]);
                }
                ActorRef[] jobManager = new ActorRef[2];
                String[] jobManagerAddress = new String[2];
                for (int i2 = 0; i2 < jobManager.length; ++i2) {
                    Configuration jmConfig = config.clone();
                    jmConfig.setInteger("jobmanager.web.port", webMonitor[i2].getServerPort());
                    jobManager[i2] = (ActorRef)JobManager.startJobManagerActors((Configuration)jmConfig, (ActorSystem)jobManagerSystem[i2], JobManager.class, MemoryArchivist.class)._1();
                    jobManagerAddress[i2] = AkkaUtils.getAkkaURL((ActorSystem)jobManagerSystem[i2], (ActorRef)jobManager[i2]);
                    webMonitor[i2].start(jobManagerAddress[i2]);
                }
                ZooKeeperLeaderRetrievalService lrs = ZooKeeperUtils.createLeaderRetrievalService((Configuration)config);
                leaderRetrievalServices.add(lrs);
                TestingListener leaderListener = new TestingListener();
                lrs.start((LeaderRetrievalListener)leaderListener);
                leaderListener.waitForNewLeader(deadline.timeLeft().toMillis());
                String leaderAddress = leaderListener.getAddress();
                int leaderIndex = leaderAddress.equals(jobManagerAddress[0]) ? 0 : 1;
                int followerIndex = (leaderIndex + 1) % 2;
                ActorSystem leadingSystem = jobManagerSystem[leaderIndex];
                ActorSystem followerSystem = jobManagerSystem[followerIndex];
                WebRuntimeMonitor leadingWebMonitor = webMonitor[leaderIndex];
                WebRuntimeMonitor followerWebMonitor = webMonitor[followerIndex];
                JobManagerRetriever leadingRetriever = (JobManagerRetriever)Whitebox.getInternalState((Object)leadingWebMonitor, (String)"retriever");
                JobManagerRetriever followerRetriever = (JobManagerRetriever)Whitebox.getInternalState((Object)followerWebMonitor, (String)"retriever");
                this.waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], leadingRetriever, deadline);
                this.waitForLeaderNotification(leadingSystem, jobManager[leaderIndex], followerRetriever, deadline);
                try (HttpTestClient leaderClient = new HttpTestClient("localhost", leadingWebMonitor.getServerPort());
                     HttpTestClient followingClient = new HttpTestClient("localhost", followerWebMonitor.getServerPort());){
                    String expected = new Scanner(new File(this.MAIN_RESOURCES_PATH + "/index.html")).useDelimiter("\\A").next();
                    leaderClient.sendGetRequest("index.html", deadline.timeLeft());
                    HttpTestClient.SimpleHttpResponse response = leaderClient.getNextResponse(deadline.timeLeft());
                    Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
                    Assert.assertEquals((Object)response.getType(), (Object)MimeTypes.getMimeTypeForExtension((String)"html"));
                    Assert.assertEquals((Object)expected, (Object)response.getContent());
                    followingClient.sendGetRequest("index.html", deadline.timeLeft());
                    response = followingClient.getNextResponse(deadline.timeLeft());
                    Assert.assertEquals((Object)HttpResponseStatus.TEMPORARY_REDIRECT, (Object)response.getStatus());
                    Assert.assertTrue((boolean)response.getLocation().contains("" + leadingWebMonitor.getServerPort()));
                    leadingSystem.shutdown();
                    this.waitForLeaderNotification(followerSystem, jobManager[followerIndex], followerRetriever, deadline);
                    followingClient.sendGetRequest("index.html", deadline.timeLeft());
                    response = followingClient.getNextResponse(deadline.timeLeft());
                    Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
                    Assert.assertEquals((Object)response.getType(), (Object)MimeTypes.getMimeTypeForExtension((String)"html"));
                    Assert.assertEquals((Object)expected, (Object)response.getContent());
                    followingClient.sendGetRequest("/overview", deadline.timeLeft());
                    response = followingClient.getNextResponse(deadline.timeLeft());
                    Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
                    Assert.assertEquals((Object)response.getType(), (Object)MimeTypes.getMimeTypeForExtension((String)"json"));
                    Assert.assertTrue((response.getContent().contains("\"taskmanagers\":1") || response.getContent().contains("\"taskmanagers\":0") ? 1 : 0) != 0);
                }
            }
            catch (Throwable throwable2) {
                Throwable throwable3 = throwable2;
                throw throwable2;
            }
        }
        catch (Throwable throwable) {
            for (ActorSystem system : jobManagerSystem) {
                if (system == null) continue;
                system.shutdown();
            }
            for (ActorSystem monitor : webMonitor) {
                monitor.stop();
            }
            for (LeaderRetrievalService lrs : leaderRetrievalServices) {
                lrs.stop();
            }
            throw throwable;
        }
        for (ActorSystem actorSystem : jobManagerSystem) {
            if (actorSystem == null) continue;
            actorSystem.shutdown();
        }
        for (ActorSystem actorSystem : webMonitor) {
            actorSystem.stop();
        }
        for (LeaderRetrievalService leaderRetrievalService : leaderRetrievalServices) {
            leaderRetrievalService.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderNotAvailable() throws Exception {
        Deadline deadline = TestTimeout.fromNow();
        ActorSystem actorSystem = null;
        WebRuntimeMonitor webRuntimeMonitor = null;
        try (TestingServer zooKeeper = new TestingServer();){
            File logDir = this.temporaryFolder.newFolder();
            Path logFile = Files.createFile(new File(logDir, "jobmanager.log").toPath(), new FileAttribute[0]);
            Files.createFile(new File(logDir, "jobmanager.out").toPath(), new FileAttribute[0]);
            Configuration config = new Configuration();
            config.setInteger("jobmanager.web.port", 0);
            config.setString("jobmanager.web.log.path", logFile.toString());
            config.setString("recovery.mode", "ZOOKEEPER");
            config.setString("recovery.zookeeper.quorum", zooKeeper.getConnectString());
            actorSystem = AkkaUtils.createDefaultActorSystem();
            LeaderRetrievalService leaderRetrievalService = (LeaderRetrievalService)Mockito.mock(LeaderRetrievalService.class);
            webRuntimeMonitor = new WebRuntimeMonitor(config, leaderRetrievalService, actorSystem);
            webRuntimeMonitor.start("akka://schmakka");
            try (HttpTestClient client = new HttpTestClient("localhost", webRuntimeMonitor.getServerPort());){
                client.sendGetRequest("index.html", deadline.timeLeft());
                HttpTestClient.SimpleHttpResponse response = client.getNextResponse();
                Assert.assertEquals((Object)HttpResponseStatus.SERVICE_UNAVAILABLE, (Object)response.getStatus());
                Assert.assertEquals((Object)MimeTypes.getMimeTypeForExtension((String)"txt"), (Object)response.getType());
                Assert.assertTrue((boolean)response.getContent().contains("refresh"));
            }
        }
        finally {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (webRuntimeMonitor != null) {
                webRuntimeMonitor.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoEscape() throws Exception {
        Deadline deadline = TestTimeout.fromNow();
        TestingCluster flink = null;
        WebRuntimeMonitor webMonitor = null;
        try {
            flink = new TestingCluster(new Configuration());
            flink.start(true);
            ActorSystem jmActorSystem = (ActorSystem)((Seq)flink.jobManagerActorSystems().get()).head();
            ActorRef jmActor = (ActorRef)((Seq)flink.jobManagerActors().get()).head();
            String jobManagerAddress = AkkaUtils.getAkkaURL((ActorSystem)jmActorSystem, (ActorRef)jmActor);
            Configuration config = new Configuration();
            config.setInteger("jobmanager.web.port", 0);
            webMonitor = new WebRuntimeMonitor(config, flink.createLeaderRetrievalService(), jmActorSystem);
            webMonitor.start(jobManagerAddress);
            try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort());){
                String expectedIndex = new Scanner(new File(this.MAIN_RESOURCES_PATH + "/index.html")).useDelimiter("\\A").next();
                client.sendGetRequest("index.html", deadline.timeLeft());
                HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
                Assert.assertEquals((Object)response.getType(), (Object)MimeTypes.getMimeTypeForExtension((String)"html"));
                Assert.assertEquals((Object)expectedIndex, (Object)response.getContent());
                File illegalFile = new File(webMonitor.getBaseDir(new Configuration()), "test-file-" + UUID.randomUUID());
                illegalFile.deleteOnExit();
                Assert.assertTrue((String)"Failed to create test file", (boolean)illegalFile.createNewFile());
                client.sendGetRequest("../" + illegalFile.getName(), deadline.timeLeft());
                response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((String)("Unexpected status code " + response.getStatus() + " for file outside of web root."), (Object)HttpResponseStatus.NOT_FOUND, (Object)response.getStatus());
                client.sendGetRequest("not-existing-resource", deadline.timeLeft());
                response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((String)("Unexpected status code " + response.getStatus() + " for file outside of web root."), (Object)HttpResponseStatus.NOT_FOUND, (Object)response.getStatus());
            }
        }
        finally {
            if (flink != null) {
                flink.shutdown();
            }
            if (webMonitor != null) {
                webMonitor.stop();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testNoCopyFromJar() throws Exception {
        Deadline deadline = TestTimeout.fromNow();
        TestingCluster flink = null;
        WebRuntimeMonitor webMonitor = null;
        try {
            flink = new TestingCluster(new Configuration());
            flink.start(true);
            ActorSystem jmActorSystem = (ActorSystem)((Seq)flink.jobManagerActorSystems().get()).head();
            ActorRef jmActor = (ActorRef)((Seq)flink.jobManagerActors().get()).head();
            String jobManagerAddress = AkkaUtils.getAkkaURL((ActorSystem)jmActorSystem, (ActorRef)jmActor);
            Configuration config = new Configuration();
            config.setInteger("jobmanager.web.port", 0);
            webMonitor = new WebRuntimeMonitor(config, flink.createLeaderRetrievalService(), jmActorSystem);
            webMonitor.start(jobManagerAddress);
            try (HttpTestClient client = new HttpTestClient("localhost", webMonitor.getServerPort());){
                String expectedIndex = new Scanner(new File(this.MAIN_RESOURCES_PATH + "/index.html")).useDelimiter("\\A").next();
                client.sendGetRequest("index.html", deadline.timeLeft());
                HttpTestClient.SimpleHttpResponse response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((Object)HttpResponseStatus.OK, (Object)response.getStatus());
                Assert.assertEquals((Object)response.getType(), (Object)MimeTypes.getMimeTypeForExtension((String)"html"));
                Assert.assertEquals((Object)expectedIndex, (Object)response.getContent());
                client.sendGetRequest("../log4j-test.properties", deadline.timeLeft());
                response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((String)("Returned status code " + response.getStatus() + " for file outside of web root."), (Object)HttpResponseStatus.NOT_FOUND, (Object)response.getStatus());
                Assert.assertFalse((String)"Did not respond with the file, but still copied it from the JAR.", (boolean)new File(webMonitor.getBaseDir(new Configuration()), "log4j-test.properties").exists());
                client.sendGetRequest("not-existing-resource", deadline.timeLeft());
                response = client.getNextResponse(deadline.timeLeft());
                Assert.assertEquals((String)("Unexpected status code " + response.getStatus() + " for file outside of web root."), (Object)HttpResponseStatus.NOT_FOUND, (Object)response.getStatus());
            }
        }
        finally {
            if (flink != null) {
                flink.shutdown();
            }
            if (webMonitor != null) {
                webMonitor.stop();
            }
        }
    }

    private void waitForLeaderNotification(ActorSystem system, ActorRef expectedLeader, JobManagerRetriever retriever, Deadline deadline) throws Exception {
        String expectedJobManagerUrl = AkkaUtils.getAkkaURL((ActorSystem)system, (ActorRef)expectedLeader);
        while (deadline.hasTimeLeft()) {
            ActorRef leaderRef = ((ActorGateway)retriever.awaitJobManagerGatewayAndWebPort()._1()).actor();
            if (AkkaUtils.getAkkaURL((ActorSystem)system, (ActorRef)leaderRef).equals(expectedJobManagerUrl)) {
                return;
            }
            Thread.sleep(100L);
        }
    }
}

