package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.net.InetAddress;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Some;
import scala.Tuple2;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerTest.class */
public class JobManagerTest {
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testNullHostnameGoesToLocalhost() {
        try {
            Assert.assertTrue(InetAddress.getByName(AkkaUtils.getAkkaConfig(new Configuration(), new Some(new Tuple2((Object) null, 1772))).getString("akka.remote.netty.tcp.hostname")).isLoopbackAddress());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRequestPartitionState() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.1
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.1.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            try {
                                testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                                JobVertex jobVertex = new JobVertex("Sender");
                                jobVertex.setParallelism(1);
                                jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                                jobVertex.createAndAddResultDataSet(intermediateDataSetID, ResultPartitionType.PIPELINED);
                                JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{jobVertex});
                                JobID jobID = jobGraph.getJobID();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                                leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobID), akkaActorGateway);
                                expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                                ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                                leaderGateway.tell(new TestingJobManagerMessages.RequestExecutionGraph(jobID), akkaActorGateway);
                                ExecutionVertex executionVertex = ((TestingJobManagerMessages.ExecutionGraphFound) expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class)).executionGraph().getJobVertex(jobVertex.getID()).getTaskVertices()[0];
                                JobManagerMessages.RequestPartitionState requestPartitionState = new JobManagerMessages.RequestPartitionState(jobID, new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId()), executionAttemptID, intermediateDataSetID);
                                for (ExecutionState executionState : ExecutionState.values()) {
                                    ExecutionGraphTestUtils.setVertexState(executionVertex, executionState);
                                    leaderGateway.tell(requestPartitionState, akkaActorGateway);
                                    JobManagerMessages.LeaderSessionMessage leaderSessionMessage = (JobManagerMessages.LeaderSessionMessage) expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
                                    Assert.assertEquals(TaskMessages.PartitionState.class, leaderSessionMessage.message().getClass());
                                    TaskMessages.PartitionState partitionState = (TaskMessages.PartitionState) leaderSessionMessage.message();
                                    Assert.assertEquals(requestPartitionState.taskExecutionId(), partitionState.taskExecutionId());
                                    Assert.assertEquals(requestPartitionState.taskResultId(), partitionState.taskResultId());
                                    Assert.assertEquals(requestPartitionState.partitionId().getPartitionId(), partitionState.partitionId());
                                    Assert.assertEquals(executionState, partitionState.state());
                                }
                                JobManagerMessages.RequestPartitionState requestPartitionState2 = new JobManagerMessages.RequestPartitionState(jobID, new ResultPartitionID(), executionAttemptID, intermediateDataSetID);
                                leaderGateway.tell(requestPartitionState2, akkaActorGateway);
                                JobManagerMessages.LeaderSessionMessage leaderSessionMessage2 = (JobManagerMessages.LeaderSessionMessage) expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
                                Assert.assertEquals(TaskMessages.PartitionState.class, leaderSessionMessage2.message().getClass());
                                TaskMessages.PartitionState partitionState2 = (TaskMessages.PartitionState) leaderSessionMessage2.message();
                                Assert.assertEquals(requestPartitionState2.taskExecutionId(), partitionState2.taskExecutionId());
                                Assert.assertEquals(requestPartitionState2.taskResultId(), partitionState2.taskResultId());
                                Assert.assertEquals(requestPartitionState2.partitionId().getPartitionId(), partitionState2.partitionId());
                                Assert.assertNull(partitionState2.state());
                                JobManagerMessages.RequestPartitionState requestPartitionState3 = new JobManagerMessages.RequestPartitionState(new JobID(), new ResultPartitionID(), executionAttemptID, intermediateDataSetID);
                                leaderGateway.tell(requestPartitionState3, akkaActorGateway);
                                JobManagerMessages.LeaderSessionMessage leaderSessionMessage3 = (JobManagerMessages.LeaderSessionMessage) expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
                                Assert.assertEquals(TaskMessages.PartitionState.class, leaderSessionMessage3.message().getClass());
                                TaskMessages.PartitionState partitionState3 = (TaskMessages.PartitionState) leaderSessionMessage3.message();
                                Assert.assertEquals(requestPartitionState3.taskExecutionId(), partitionState3.taskExecutionId());
                                Assert.assertEquals(requestPartitionState3.taskResultId(), partitionState3.taskResultId());
                                Assert.assertEquals(requestPartitionState3.partitionId().getPartitionId(), partitionState3.partitionId());
                                Assert.assertNull(partitionState3.state());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignal() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.2
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.2.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex jobVertex = new JobVertex("Sender");
                            jobVertex.setParallelism(2);
                            jobVertex.setInvokableClass(StoppableInvokable.class);
                            JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new JobVertex[]{jobVertex});
                            JobID jobID = jobGraph.getJobID();
                            ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                            leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            leaderGateway.tell(new JobManagerMessages.StopJob(jobID), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.StoppingSuccess.class);
                            expectMsgClass(JobManagerMessages.JobResultSuccess.class);
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignalFail() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.3
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.3.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex jobVertex = new JobVertex("Sender");
                            jobVertex.setParallelism(1);
                            jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                            JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new JobVertex[]{jobVertex});
                            JobID jobID = jobGraph.getJobID();
                            ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                            leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            leaderGateway.tell(new JobManagerMessages.StopJob(jobID), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.StoppingFailure.class);
                            leaderGateway.tell(new TestingJobManagerMessages.RequestExecutionGraph(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class);
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }
}
