package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Status;
import akka.testkit.JavaTestKit;
import akka.testkit.TestProbe;
import java.io.File;
import java.net.InetAddress;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.CheckpointDeclineReason;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.query.KvStateID;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage;
import org.apache.flink.runtime.query.KvStateServerAddress;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.reflect.ClassTag$;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerTest.class */
public class JobManagerTest extends TestLogger {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private static ActorSystem system;
    private HighAvailabilityServices highAvailabilityServices;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Before
    public void setupTest() {
        this.highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
    }

    @After
    public void tearDownTest() throws Exception {
        this.highAvailabilityServices.closeAndCleanupAllData();
        this.highAvailabilityServices = null;
    }

    @Test
    public void testNullHostnameGoesToLocalhost() {
        try {
            Assert.assertTrue(InetAddress.getByName(AkkaUtils.getAkkaConfig(new Configuration(), new Some(new Tuple2((Object) null, 1772))).getString("akka.remote.netty.tcp.hostname")).isLoopbackAddress());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRequestPartitionState() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.1
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.1.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            try {
                                testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                                JobVertex jobVertex = new JobVertex("Sender");
                                jobVertex.setParallelism(1);
                                jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
                                jobVertex.createAndAddResultDataSet(intermediateDataSetID, ResultPartitionType.PIPELINED);
                                JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{jobVertex});
                                JobID jobID = jobGraph.getJobID();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                                leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobID), akkaActorGateway);
                                expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                                new ExecutionAttemptID();
                                leaderGateway.tell(new TestingJobManagerMessages.RequestExecutionGraph(jobID), akkaActorGateway);
                                ExecutionVertex executionVertex = ((TestingJobManagerMessages.ExecutionGraphFound) expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class)).executionGraph().getJobVertex(jobVertex.getID()).getTaskVertices()[0];
                                JobManagerMessages.RequestPartitionProducerState requestPartitionProducerState = new JobManagerMessages.RequestPartitionProducerState(jobID, intermediateDataSetID, new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId()));
                                for (ExecutionState executionState : ExecutionState.values()) {
                                    ExecutionGraphTestUtils.setVertexState(executionVertex, executionState);
                                    Assert.assertEquals(executionState, (ExecutionState) Await.result(leaderGateway.ask(requestPartitionProducerState, getRemainingTime()).mapTo(ClassTag$.MODULE$.apply(ExecutionState.class)), getRemainingTime()));
                                }
                                try {
                                    Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(jobID, intermediateDataSetID, new ResultPartitionID()), getRemainingTime()), getRemainingTime());
                                    Assert.fail("Did not fail with expected RuntimeException");
                                } catch (RuntimeException e) {
                                    Assert.assertEquals(IllegalArgumentException.class, e.getCause().getClass());
                                }
                                try {
                                    Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(new JobID(), intermediateDataSetID, new ResultPartitionID()), getRemainingTime()), getRemainingTime());
                                    Assert.fail("Did not fail with expected IllegalArgumentException");
                                } catch (IllegalArgumentException e2) {
                                }
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            } catch (Exception e3) {
                                e3.printStackTrace();
                                Assert.fail(e3.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testRequestPartitionStateUnregisteredExecution() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.2
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.2.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            try {
                                testingCluster = TestingUtils.startTestingCluster(4, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                                JobVertex jobVertex = new JobVertex("Sender");
                                jobVertex.setParallelism(1);
                                jobVertex.setInvokableClass(NoOpInvokable.class);
                                jobVertex.createAndAddResultDataSet(intermediateDataSetID, ResultPartitionType.PIPELINED);
                                JobVertex jobVertex2 = new JobVertex("Blocking Sender");
                                jobVertex2.setParallelism(1);
                                jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
                                jobVertex2.createAndAddResultDataSet(new IntermediateDataSetID(), ResultPartitionType.PIPELINED);
                                JobGraph jobGraph = new JobGraph("Fast finishing producer test job", new JobVertex[]{jobVertex, jobVertex2});
                                JobID jobID = jobGraph.getJobID();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                                leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobID), akkaActorGateway);
                                expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                                ExecutionVertex executionVertex = ((TestingJobManagerMessages.ExecutionGraphFound) Await.result(leaderGateway.ask(new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), remaining()), remaining())).executionGraph().getJobVertex(jobVertex.getID()).getTaskVertices()[0];
                                while (executionVertex.getExecutionState() != ExecutionState.FINISHED) {
                                    Thread.sleep(1L);
                                }
                                Assert.assertEquals(ExecutionState.FINISHED, Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(jobID, intermediateDataSetID, new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId())), getRemainingTime()).mapTo(ClassTag$.MODULE$.apply(ExecutionState.class)), getRemainingTime()));
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.3
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.3.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            try {
                                testingCluster = TestingUtils.startTestingCluster(4, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                                JobVertex jobVertex = new JobVertex("Sender");
                                jobVertex.setParallelism(1);
                                jobVertex.setInvokableClass(NoOpInvokable.class);
                                jobVertex.createAndAddResultDataSet(intermediateDataSetID, ResultPartitionType.PIPELINED);
                                JobVertex jobVertex2 = new JobVertex("Blocking Sender");
                                jobVertex2.setParallelism(1);
                                jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
                                jobVertex2.createAndAddResultDataSet(new IntermediateDataSetID(), ResultPartitionType.PIPELINED);
                                JobGraph jobGraph = new JobGraph("Fast finishing producer test job", new JobVertex[]{jobVertex, jobVertex2});
                                JobID jobID = jobGraph.getJobID();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                                leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobID), akkaActorGateway);
                                expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                                ExecutionVertex executionVertex = ((TestingJobManagerMessages.ExecutionGraphFound) Await.result(leaderGateway.ask(new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), remaining()), remaining())).executionGraph().getJobVertex(jobVertex.getID()).getTaskVertices()[0];
                                while (executionVertex.getExecutionState() != ExecutionState.FINISHED) {
                                    Thread.sleep(1L);
                                }
                                ResultPartitionID resultPartitionID = new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId());
                                executionVertex.resetForNewExecution(System.currentTimeMillis(), 1L);
                                try {
                                    Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(jobID, intermediateDataSetID, resultPartitionID), getRemainingTime()), getRemainingTime());
                                    Assert.fail("Did not fail with expected Exception");
                                } catch (PartitionProducerDisposedException e) {
                                }
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            } catch (Exception e2) {
                                e2.printStackTrace();
                                Assert.fail(e2.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignal() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.4
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.4.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex jobVertex = new JobVertex("Sender");
                            jobVertex.setParallelism(2);
                            jobVertex.setInvokableClass(StoppableInvokable.class);
                            JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new JobVertex[]{jobVertex});
                            JobID jobID = jobGraph.getJobID();
                            ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            leaderGateway.tell(new JobManagerMessages.StopJob(jobID), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.StoppingSuccess.class);
                            expectMsgClass(JobManagerMessages.JobResultSuccess.class);
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignalFail() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.5
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.5.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex jobVertex = new JobVertex("Sender");
                            jobVertex.setParallelism(1);
                            jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
                            JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new JobVertex[]{jobVertex});
                            JobID jobID = jobGraph.getJobID();
                            ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID);
                            leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            leaderGateway.tell(new JobManagerMessages.StopJob(jobID), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.StoppingFailure.class);
                            leaderGateway.tell(new TestingJobManagerMessages.RequestExecutionGraph(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class);
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testKvStateMessages() throws Exception {
        Deadline fromNow = new FiniteDuration(100L, TimeUnit.SECONDS).fromNow();
        Configuration configuration = new Configuration();
        configuration.setString("akka.ask.timeout", "100ms");
        AkkaActorGateway akkaActorGateway = new AkkaActorGateway((ActorRef) JobManager.startJobManagerActors(configuration, system, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), this.highAvailabilityServices, TestingJobManager.class, MemoryArchivist.class)._1(), LeaderRetrievalUtils.retrieveLeaderSessionId(this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), TestingUtils.TESTING_TIMEOUT()));
        Configuration configuration2 = new Configuration();
        configuration2.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
        configuration2.setInteger("taskmanager.numberOfTaskSlots", 8);
        TaskManager.startTaskManagerComponentsAndActor(configuration2, ResourceID.generate(), system, this.highAvailabilityServices, "localhost", Option.empty(), true, TestingTaskManager.class);
        Await.ready(akkaActorGateway.ask(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), fromNow.timeLeft()), fromNow.timeLeft());
        try {
            Await.result(akkaActorGateway.ask(new KvStateMessage.LookupKvStateLocation(new JobID(), "any-name"), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)), fromNow.timeLeft());
            Assert.fail("Did not throw expected Exception");
        } catch (IllegalStateException e) {
        }
        JobGraph jobGraph = new JobGraph("croissant");
        JobVertex jobVertex = new JobVertex("cappuccino");
        jobVertex.setParallelism(4);
        jobVertex.setMaxParallelism(16);
        jobVertex.setInvokableClass(BlockingNoOpInvokable.class);
        JobVertex jobVertex2 = new JobVertex("americano");
        jobVertex2.setParallelism(4);
        jobVertex2.setMaxParallelism(16);
        jobVertex2.setInvokableClass(BlockingNoOpInvokable.class);
        jobGraph.addVertex(jobVertex);
        jobGraph.addVertex(jobVertex2);
        Await.result(akkaActorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobSubmitSuccess.class)), fromNow.timeLeft());
        try {
            Await.result(akkaActorGateway.ask(new KvStateMessage.LookupKvStateLocation(jobGraph.getJobID(), "unknown"), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)), fromNow.timeLeft());
            Assert.fail("Did not throw expected Exception");
        } catch (UnknownKvStateLocation e2) {
        }
        KvStateMessage.NotifyKvStateRegistered notifyKvStateRegistered = new KvStateMessage.NotifyKvStateRegistered(new JobID(), new JobVertexID(), new KeyGroupRange(0, 0), "any-name", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1233));
        akkaActorGateway.tell(notifyKvStateRegistered);
        try {
            Await.result(akkaActorGateway.ask(new KvStateMessage.LookupKvStateLocation(notifyKvStateRegistered.getJobId(), notifyKvStateRegistered.getRegistrationName()), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)), fromNow.timeLeft());
            Assert.fail("Did not throw expected Exception");
        } catch (IllegalStateException e3) {
        }
        KvStateMessage.NotifyKvStateRegistered notifyKvStateRegistered2 = new KvStateMessage.NotifyKvStateRegistered(jobGraph.getJobID(), jobVertex.getID(), new KeyGroupRange(0, 0), "register-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
        akkaActorGateway.tell(notifyKvStateRegistered2);
        KvStateMessage.LookupKvStateLocation lookupKvStateLocation = new KvStateMessage.LookupKvStateLocation(notifyKvStateRegistered2.getJobId(), notifyKvStateRegistered2.getRegistrationName());
        KvStateLocation kvStateLocation = (KvStateLocation) Await.result(akkaActorGateway.ask(lookupKvStateLocation, fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)), fromNow.timeLeft());
        Assert.assertNotNull(kvStateLocation);
        Assert.assertEquals(jobGraph.getJobID(), kvStateLocation.getJobId());
        Assert.assertEquals(jobVertex.getID(), kvStateLocation.getJobVertexId());
        Assert.assertEquals(jobVertex.getMaxParallelism(), kvStateLocation.getNumKeyGroups());
        Assert.assertEquals(1L, kvStateLocation.getNumRegisteredKeyGroups());
        KeyGroupRange keyGroupRange = notifyKvStateRegistered2.getKeyGroupRange();
        Assert.assertEquals(1L, keyGroupRange.getNumberOfKeyGroups());
        Assert.assertEquals(notifyKvStateRegistered2.getKvStateId(), kvStateLocation.getKvStateID(keyGroupRange.getStartKeyGroup()));
        Assert.assertEquals(notifyKvStateRegistered2.getKvStateServerAddress(), kvStateLocation.getKvStateServerAddress(keyGroupRange.getStartKeyGroup()));
        akkaActorGateway.tell(new KvStateMessage.NotifyKvStateUnregistered(notifyKvStateRegistered2.getJobId(), notifyKvStateRegistered2.getJobVertexId(), notifyKvStateRegistered2.getKeyGroupRange(), notifyKvStateRegistered2.getRegistrationName()));
        try {
            Await.result(akkaActorGateway.ask(lookupKvStateLocation, fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(KvStateLocation.class)), fromNow.timeLeft());
            Assert.fail("Did not throw expected Exception");
        } catch (UnknownKvStateLocation e4) {
        }
        KvStateMessage.NotifyKvStateRegistered notifyKvStateRegistered3 = new KvStateMessage.NotifyKvStateRegistered(jobGraph.getJobID(), jobVertex.getID(), new KeyGroupRange(0, 0), "duplicate-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
        KvStateMessage.NotifyKvStateRegistered notifyKvStateRegistered4 = new KvStateMessage.NotifyKvStateRegistered(jobGraph.getJobID(), jobVertex2.getID(), new KeyGroupRange(0, 0), "duplicate-me", new KvStateID(), new KvStateServerAddress(InetAddress.getLocalHost(), 1293));
        Future mapTo = akkaActorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.FAILED), fromNow.timeLeft()).mapTo(ClassTag$.MODULE$.apply(TestingJobManagerMessages.JobStatusIs.class));
        akkaActorGateway.tell(notifyKvStateRegistered3);
        akkaActorGateway.tell(notifyKvStateRegistered4);
        Assert.assertEquals(JobStatus.FAILED, ((TestingJobManagerMessages.JobStatusIs) Await.result(mapTo, fromNow.timeLeft())).state());
    }

    @Test
    public void testCancelWithSavepoint() throws Exception {
        File newFolder = this.tmpFolder.newFolder();
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration configuration = new Configuration();
        configuration.setString("state.savepoints.dir", newFolder.getAbsolutePath());
        ActorSystem actorSystem = null;
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        ActorGateway actorGateway3 = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
            Tuple2 startJobManagerActors = JobManager.startJobManagerActors(configuration, actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), this.highAvailabilityServices, Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID retrieveLeaderSessionId = LeaderRetrievalUtils.retrieveLeaderSessionId(this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), TestingUtils.TESTING_TIMEOUT());
            actorGateway = new AkkaActorGateway((ActorRef) startJobManagerActors._1(), retrieveLeaderSessionId);
            actorGateway2 = new AkkaActorGateway((ActorRef) startJobManagerActors._2(), retrieveLeaderSessionId);
            actorGateway3 = new AkkaActorGateway(TaskManager.startTaskManagerComponentsAndActor(configuration, ResourceID.generate(), actorSystem, this.highAvailabilityServices, "localhost", Option.apply("tm"), true, TestingTaskManager.class), retrieveLeaderSessionId);
            Await.ready(actorGateway3.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(actorGateway.actor()), finiteDuration), finiteDuration);
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{jobVertex});
            jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), 3600000L, 3600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), (SerializedValue) null, true));
            Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Await.result(actorGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), finiteDuration), finiteDuration);
            Future ask = actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED), finiteDuration);
            String str = null;
            int i = 0;
            while (true) {
                if (i >= 10) {
                    break;
                }
                JobManagerMessages.CancellationFailure cancellationFailure = (JobManagerMessages.CancellationResponse) Await.result(actorGateway.ask(new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), (String) null), finiteDuration), finiteDuration);
                if (!(cancellationFailure instanceof JobManagerMessages.CancellationFailure)) {
                    str = ((JobManagerMessages.CancellationSuccess) cancellationFailure).savepointPath();
                    break;
                }
                JobManagerMessages.CancellationFailure cancellationFailure2 = cancellationFailure;
                if (cancellationFailure2.cause().getMessage().contains(CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING.message())) {
                    Thread.sleep(200L);
                } else {
                    cancellationFailure2.cause().printStackTrace();
                    Assert.fail("Failed to cancel job: " + cancellationFailure2.cause().getMessage());
                }
                i++;
            }
            Assert.assertNotEquals("Savepoint not triggered", (Object) null, str);
            Await.ready(ask, finiteDuration);
            Assert.assertEquals(true, Boolean.valueOf(new File(str).exists()));
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
        } catch (Throwable th) {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
            throw th;
        }
    }

    @Test
    public void testCancelWithSavepointNoDirectoriesConfigured() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration configuration = new Configuration();
        ActorSystem actorSystem = null;
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        ActorGateway actorGateway3 = null;
        try {
            ActorSystem createLocalActorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
            Tuple2 startJobManagerActors = JobManager.startJobManagerActors(configuration, createLocalActorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), this.highAvailabilityServices, Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID retrieveLeaderSessionId = LeaderRetrievalUtils.retrieveLeaderSessionId(this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), TestingUtils.TESTING_TIMEOUT());
            AkkaActorGateway akkaActorGateway = new AkkaActorGateway((ActorRef) startJobManagerActors._1(), retrieveLeaderSessionId);
            AkkaActorGateway akkaActorGateway2 = new AkkaActorGateway((ActorRef) startJobManagerActors._2(), retrieveLeaderSessionId);
            AkkaActorGateway akkaActorGateway3 = new AkkaActorGateway(TaskManager.startTaskManagerComponentsAndActor(configuration, ResourceID.generate(), createLocalActorSystem, this.highAvailabilityServices, "localhost", Option.apply("tm"), true, TestingTaskManager.class), retrieveLeaderSessionId);
            Await.ready(akkaActorGateway3.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(akkaActorGateway.actor()), finiteDuration), finiteDuration);
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{jobVertex});
            jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), 3600000L, 3600000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), (SerializedValue) null, true));
            Await.result(akkaActorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Await.result(akkaActorGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), finiteDuration), finiteDuration);
            JobManagerMessages.CancellationFailure cancellationFailure = (JobManagerMessages.CancellationResponse) Await.result(akkaActorGateway.ask(new JobManagerMessages.CancelJobWithSavepoint(jobGraph.getJobID(), (String) null), finiteDuration), finiteDuration);
            if (cancellationFailure instanceof JobManagerMessages.CancellationFailure) {
                JobManagerMessages.CancellationFailure cancellationFailure2 = cancellationFailure;
                Assert.assertTrue(cancellationFailure2.cause() instanceof IllegalStateException);
                Assert.assertTrue(cancellationFailure2.cause().getMessage().contains("savepoint directory"));
            } else {
                Assert.fail("Unexpected cancellation response from JobManager: " + cancellationFailure);
            }
            if (createLocalActorSystem != null) {
                createLocalActorSystem.shutdown();
            }
            if (akkaActorGateway2 != null) {
                akkaActorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (akkaActorGateway != null) {
                akkaActorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (akkaActorGateway3 != null) {
                akkaActorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
        } catch (Throwable th) {
            if (0 != 0) {
                actorSystem.shutdown();
            }
            if (0 != 0) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (0 != 0) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (0 != 0) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            throw th;
        }
    }

    @Test
    public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
        File newFolder = this.tmpFolder.newFolder();
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration configuration = new Configuration();
        configuration.setString("state.savepoints.dir", newFolder.getAbsolutePath());
        ActorSystem actorSystem = null;
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        ActorGateway actorGateway3 = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
            Tuple2 startJobManagerActors = JobManager.startJobManagerActors(configuration, actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), this.highAvailabilityServices, Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID retrieveLeaderSessionId = LeaderRetrievalUtils.retrieveLeaderSessionId(this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), TestingUtils.TESTING_TIMEOUT());
            actorGateway = new AkkaActorGateway((ActorRef) startJobManagerActors._1(), retrieveLeaderSessionId);
            actorGateway2 = new AkkaActorGateway((ActorRef) startJobManagerActors._2(), retrieveLeaderSessionId);
            actorGateway3 = new AkkaActorGateway(TaskManager.startTaskManagerComponentsAndActor(configuration, ResourceID.generate(), actorSystem, this.highAvailabilityServices, "localhost", Option.apply("tm"), true, TestingTaskManager.class), retrieveLeaderSessionId);
            Await.ready(actorGateway3.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(actorGateway.actor()), finiteDuration), finiteDuration);
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{jobVertex});
            jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), (SerializedValue) null, true));
            Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Await.result(actorGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), finiteDuration), finiteDuration);
            Assert.assertTrue("Did not trigger savepoint", Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID(), Option.apply(this.tmpFolder.newFolder().getAbsolutePath())), finiteDuration), finiteDuration) instanceof JobManagerMessages.TriggerSavepointSuccess);
            Assert.assertEquals(1L, r0.listFiles().length);
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
        } catch (Throwable th) {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
            throw th;
        }
    }

    @Test
    public void testSavepointRestoreSettings() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        ActorSystem actorSystem = null;
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        ActorGateway actorGateway3 = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
            Tuple2 startJobManagerActors = JobManager.startJobManagerActors(new Configuration(), actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), this.highAvailabilityServices, Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            UUID retrieveLeaderSessionId = LeaderRetrievalUtils.retrieveLeaderSessionId(this.highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), TestingUtils.TESTING_TIMEOUT());
            actorGateway = new AkkaActorGateway((ActorRef) startJobManagerActors._1(), retrieveLeaderSessionId);
            actorGateway2 = new AkkaActorGateway((ActorRef) startJobManagerActors._2(), retrieveLeaderSessionId);
            Configuration configuration = new Configuration();
            configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
            actorGateway3 = new AkkaActorGateway(TaskManager.startTaskManagerComponentsAndActor(configuration, ResourceID.generate(), actorSystem, this.highAvailabilityServices, "localhost", Option.apply("tm"), true, TestingTaskManager.class), retrieveLeaderSessionId);
            Await.ready(actorGateway3.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(actorGateway.actor()), finiteDuration), finiteDuration);
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{jobVertex});
            jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), (SerializedValue) null, true));
            Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Await.result(actorGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), finiteDuration), finiteDuration);
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID(), Option.apply(this.tmpFolder.newFolder().getAbsolutePath())), finiteDuration), finiteDuration)).savepointPath();
            Future ask = actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), finiteDuration);
            Object result = Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobGraph.getJobID()), finiteDuration), finiteDuration);
            Assert.assertTrue("Unexpected response: " + result, result instanceof JobManagerMessages.CancellationSuccess);
            Await.ready(ask, finiteDuration);
            JobVertex jobVertex2 = new JobVertex("NewSource");
            jobVertex2.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex2.setParallelism(1);
            JobGraph jobGraph2 = new JobGraph("NewTestingJob", new JobVertex[]{jobVertex2});
            jobGraph2.setSnapshotSettings(new JobCheckpointingSettings(Collections.singletonList(jobVertex2.getID()), Collections.singletonList(jobVertex2.getID()), Collections.singletonList(jobVertex2.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE, ExternalizedCheckpointSettings.none(), (SerializedValue) null, true));
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
            Object result2 = Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph2, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Assert.assertTrue("Unexpected response: " + result2, result2 instanceof JobManagerMessages.JobResultFailure);
            Throwable deserializeError = ((JobManagerMessages.JobResultFailure) result2).cause().deserializeError(ClassLoader.getSystemClassLoader());
            Assert.assertTrue(deserializeError instanceof IllegalStateException);
            Assert.assertTrue(deserializeError.getMessage().contains("allowNonRestoredState"));
            Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph2.getJobID()), finiteDuration), finiteDuration);
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
            Object result3 = Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph2, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Assert.assertTrue("Unexpected response: " + result3, result3 instanceof JobManagerMessages.JobSubmitSuccess);
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
        } catch (Throwable th) {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorSystem != null) {
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
            throw th;
        }
    }

    @Test
    public void testResourceManagerConnection() throws TimeoutException, InterruptedException {
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration configuration = new Configuration();
        configuration.setLong(JobManagerOptions.RESOURCE_MANAGER_RECONNECT_INTERVAL, 200L);
        ActorSystem createLocalActorSystem = AkkaUtils.createLocalActorSystem(configuration);
        try {
            ActorGateway createJobManager = TestingUtils.createJobManager(createLocalActorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), configuration, this.highAvailabilityServices);
            TestProbe apply = TestProbe.apply(createLocalActorSystem);
            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(apply.ref(), HighAvailabilityServices.DEFAULT_LEADER_ID);
            Await.ready(createJobManager.ask(TestingJobManagerMessages.getNotifyWhenLeader(), finiteDuration), finiteDuration);
            createJobManager.tell(new RegisterResourceManager(apply.ref()), akkaActorGateway);
            JobManagerMessages.LeaderSessionMessage leaderSessionMessage = (JobManagerMessages.LeaderSessionMessage) apply.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
            Assert.assertEquals(createJobManager.leaderSessionID(), leaderSessionMessage.leaderSessionID());
            Assert.assertTrue(leaderSessionMessage.message() instanceof RegisterResourceManagerSuccessful);
            createJobManager.tell(new RegistrationMessages.RegisterTaskManager(ResourceID.generate(), (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class), new HardwareDescription(1, 1L, 1L, 1L), 1));
            Assert.assertTrue(((JobManagerMessages.LeaderSessionMessage) apply.expectMsgClass(JobManagerMessages.LeaderSessionMessage.class)).message() instanceof NotifyResourceStarted);
            apply.lastSender().tell(new Status.Failure(new Exception("Test exception")), ActorRef.noSender());
            Deadline fromNow = new FiniteDuration(1000L, TimeUnit.MILLISECONDS).fromNow();
            boolean z = false;
            while (fromNow.hasTimeLeft()) {
                try {
                    JobManagerMessages.LeaderSessionMessage leaderSessionMessage2 = (JobManagerMessages.LeaderSessionMessage) apply.expectMsgClass(fromNow.timeLeft(), JobManagerMessages.LeaderSessionMessage.class);
                    if (leaderSessionMessage2.message() instanceof TriggerRegistrationAtJobManager) {
                        if (z) {
                            Assert.fail("A successful registration should not be followed by another TriggerRegistrationAtJobManager message.");
                        }
                        createJobManager.tell(new RegisterResourceManager(apply.ref()), akkaActorGateway);
                    } else if (leaderSessionMessage2.message() instanceof RegisterResourceManagerSuccessful) {
                        z = true;
                    } else {
                        Assert.fail("Received unknown message: " + leaderSessionMessage2.message() + '.');
                    }
                } catch (AssertionError e) {
                }
            }
            Assert.assertTrue(z);
            createLocalActorSystem.shutdown();
            createLocalActorSystem.awaitTermination();
        } catch (Throwable th) {
            createLocalActorSystem.shutdown();
            createLocalActorSystem.awaitTermination();
            throw th;
        }
    }
}
