package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.testkit.JavaTestKit;
import java.io.File;
import java.net.InetAddress;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.JobManagerHARecoveryTest;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.taskmanager.TaskManager;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingMemoryArchivist;
import org.apache.flink.runtime.testingUtils.TestingTaskManager;
import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.Tuple3;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/JobManagerTest.class */
public class JobManagerTest extends TestLogger {

    @Rule
    public TemporaryFolder tmpFolder = new TemporaryFolder();
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testNullHostnameGoesToLocalhost() {
        try {
            Assert.assertTrue(InetAddress.getByName(AkkaUtils.getAkkaConfig(new Configuration(), new Some(new Tuple2((Object) null, 1772))).getString("akka.remote.netty.tcp.hostname")).isLoopbackAddress());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRequestPartitionState() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.1
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.1.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            try {
                                testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                                JobVertex jobVertex = new JobVertex("Sender");
                                jobVertex.setParallelism(1);
                                jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                                jobVertex.createAndAddResultDataSet(intermediateDataSetID, ResultPartitionType.PIPELINED);
                                JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{jobVertex});
                                JobID jobID = jobGraph.getJobID();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                                leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobID), akkaActorGateway);
                                expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                                new ExecutionAttemptID();
                                leaderGateway.tell(new TestingJobManagerMessages.RequestExecutionGraph(jobID), akkaActorGateway);
                                ExecutionVertex executionVertex = ((TestingJobManagerMessages.ExecutionGraphFound) expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class)).executionGraph().getJobVertex(jobVertex.getID()).getTaskVertices()[0];
                                ResultPartitionID resultPartitionID = new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId());
                                ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                                JobManagerMessages.RequestPartitionProducerState requestPartitionProducerState = new JobManagerMessages.RequestPartitionProducerState(jobID, executionAttemptID, intermediateDataSetID, resultPartitionID);
                                for (ExecutionState executionState : ExecutionState.values()) {
                                    ExecutionGraphTestUtils.setVertexState(executionVertex, executionState);
                                    TaskMessages.PartitionProducerState partitionProducerState = (TaskMessages.PartitionProducerState) ((JobManagerMessages.LeaderSessionMessage) Await.result(leaderGateway.ask(requestPartitionProducerState, getRemainingTime()), getRemainingTime())).message();
                                    Assert.assertEquals(executionAttemptID, partitionProducerState.receiverExecutionId());
                                    Assert.assertTrue("Responded with failure: " + partitionProducerState, partitionProducerState.result().isLeft());
                                    Assert.assertEquals(executionState, ((Tuple3) partitionProducerState.result().left().get())._3());
                                }
                                TaskMessages.PartitionProducerState partitionProducerState2 = (TaskMessages.PartitionProducerState) ((JobManagerMessages.LeaderSessionMessage) Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(jobID, executionAttemptID, intermediateDataSetID, new ResultPartitionID()), getRemainingTime()), getRemainingTime())).message();
                                Assert.assertEquals(executionAttemptID, partitionProducerState2.receiverExecutionId());
                                Assert.assertTrue("Responded with success: " + partitionProducerState2, partitionProducerState2.result().isRight());
                                Assert.assertTrue(partitionProducerState2.result().right().get() instanceof RuntimeException);
                                Assert.assertTrue(((Exception) partitionProducerState2.result().right().get()).getCause() instanceof IllegalArgumentException);
                                TaskMessages.PartitionProducerState partitionProducerState3 = (TaskMessages.PartitionProducerState) ((JobManagerMessages.LeaderSessionMessage) Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(new JobID(), executionAttemptID, intermediateDataSetID, new ResultPartitionID()), getRemainingTime()), getRemainingTime())).message();
                                Assert.assertEquals(executionAttemptID, partitionProducerState3.receiverExecutionId());
                                Assert.assertTrue("Responded with success: " + partitionProducerState3, partitionProducerState3.result().isRight());
                                Assert.assertTrue(partitionProducerState3.result().right().get() instanceof IllegalArgumentException);
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testRequestPartitionStateUnregisteredExecution() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.2
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.2.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            try {
                                testingCluster = TestingUtils.startTestingCluster(4, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                                JobVertex jobVertex = new JobVertex("Sender");
                                jobVertex.setParallelism(1);
                                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
                                jobVertex.createAndAddResultDataSet(intermediateDataSetID, ResultPartitionType.PIPELINED);
                                JobVertex jobVertex2 = new JobVertex("Blocking Sender");
                                jobVertex2.setParallelism(1);
                                jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                                jobVertex2.createAndAddResultDataSet(new IntermediateDataSetID(), ResultPartitionType.PIPELINED);
                                JobGraph jobGraph = new JobGraph("Fast finishing producer test job", new JobVertex[]{jobVertex, jobVertex2});
                                JobID jobID = jobGraph.getJobID();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                                leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobID), akkaActorGateway);
                                expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                                ExecutionVertex executionVertex = ((TestingJobManagerMessages.ExecutionGraphFound) Await.result(leaderGateway.ask(new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), remaining()), remaining())).executionGraph().getJobVertex(jobVertex.getID()).getTaskVertices()[0];
                                while (executionVertex.getExecutionState() != ExecutionState.FINISHED) {
                                    Thread.sleep(1L);
                                }
                                ResultPartitionID resultPartitionID = new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId());
                                ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                                TaskMessages.PartitionProducerState partitionProducerState = (TaskMessages.PartitionProducerState) ((JobManagerMessages.LeaderSessionMessage) Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(jobID, executionAttemptID, intermediateDataSetID, resultPartitionID), getRemainingTime()), getRemainingTime())).message();
                                Assert.assertEquals(executionAttemptID, partitionProducerState.receiverExecutionId());
                                Assert.assertTrue("Responded with failure: " + partitionProducerState, partitionProducerState.result().isLeft());
                                Assert.assertEquals(ExecutionState.FINISHED, ((Tuple3) partitionProducerState.result().left().get())._3());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testRequestPartitionStateMoreRecentExecutionAttempt() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.3
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.3.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            try {
                                testingCluster = TestingUtils.startTestingCluster(4, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                                IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
                                JobVertex jobVertex = new JobVertex("Sender");
                                jobVertex.setParallelism(1);
                                jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
                                jobVertex.createAndAddResultDataSet(intermediateDataSetID, ResultPartitionType.PIPELINED);
                                JobVertex jobVertex2 = new JobVertex("Blocking Sender");
                                jobVertex2.setParallelism(1);
                                jobVertex2.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                                jobVertex2.createAndAddResultDataSet(new IntermediateDataSetID(), ResultPartitionType.PIPELINED);
                                JobGraph jobGraph = new JobGraph("Fast finishing producer test job", new JobVertex[]{jobVertex, jobVertex2});
                                JobID jobID = jobGraph.getJobID();
                                ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                                leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                                expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                                leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobID), akkaActorGateway);
                                expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                                ExecutionVertex executionVertex = ((TestingJobManagerMessages.ExecutionGraphFound) Await.result(leaderGateway.ask(new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), remaining()), remaining())).executionGraph().getJobVertex(jobVertex.getID()).getTaskVertices()[0];
                                while (executionVertex.getExecutionState() != ExecutionState.FINISHED) {
                                    Thread.sleep(1L);
                                }
                                ResultPartitionID resultPartitionID = new ResultPartitionID(((IntermediateResultPartition) executionVertex.getProducedPartitions().values().iterator().next()).getPartitionId(), executionVertex.getCurrentExecutionAttempt().getAttemptId());
                                executionVertex.resetForNewExecution();
                                ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
                                TaskMessages.PartitionProducerState partitionProducerState = (TaskMessages.PartitionProducerState) ((JobManagerMessages.LeaderSessionMessage) Await.result(leaderGateway.ask(new JobManagerMessages.RequestPartitionProducerState(jobID, executionAttemptID, intermediateDataSetID, resultPartitionID), getRemainingTime()), getRemainingTime())).message();
                                Assert.assertEquals(executionAttemptID, partitionProducerState.receiverExecutionId());
                                Assert.assertTrue("Responded with success: " + partitionProducerState, partitionProducerState.result().isRight());
                                Assert.assertTrue(partitionProducerState.result().right().get() instanceof PartitionProducerDisposedException);
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            } catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail(e.getMessage());
                                if (testingCluster != null) {
                                    testingCluster.shutdown();
                                }
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignal() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.4
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.4.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex jobVertex = new JobVertex("Sender");
                            jobVertex.setParallelism(2);
                            jobVertex.setInvokableClass(StoppableInvokable.class);
                            JobGraph jobGraph = new JobGraph("Stoppable streaming test job", new JobVertex[]{jobVertex});
                            JobID jobID = jobGraph.getJobID();
                            ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                            leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            leaderGateway.tell(new JobManagerMessages.StopJob(jobID), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.StoppingSuccess.class);
                            expectMsgClass(JobManagerMessages.JobResultSuccess.class);
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testStopSignalFail() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.5
            {
                new JavaTestKit.Within(duration("15 seconds")) { // from class: org.apache.flink.runtime.jobmanager.JobManagerTest.5.1
                    protected void run() {
                        TestingCluster testingCluster = null;
                        try {
                            testingCluster = TestingUtils.startTestingCluster(2, 1, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
                            JobVertex jobVertex = new JobVertex("Sender");
                            jobVertex.setParallelism(1);
                            jobVertex.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                            JobGraph jobGraph = new JobGraph("Non-Stoppable batching test job", new JobVertex[]{jobVertex});
                            JobID jobID = jobGraph.getJobID();
                            ActorGateway leaderGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
                            AkkaActorGateway akkaActorGateway = new AkkaActorGateway(getTestActor(), (UUID) null);
                            leaderGateway.tell(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.EXECUTION_RESULT), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
                            leaderGateway.tell(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
                            leaderGateway.tell(new JobManagerMessages.StopJob(jobID), akkaActorGateway);
                            expectMsgClass(JobManagerMessages.StoppingFailure.class);
                            leaderGateway.tell(new TestingJobManagerMessages.RequestExecutionGraph(jobID), akkaActorGateway);
                            expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class);
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                        } catch (Throwable th) {
                            if (testingCluster != null) {
                                testingCluster.shutdown();
                            }
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testSavepointWithDeactivatedPeriodicCheckpointing() throws Exception {
        File newFolder = this.tmpFolder.newFolder();
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        Configuration configuration = new Configuration();
        configuration.setString("savepoints.state.backend", "filesystem");
        configuration.setString("savepoints.state.backend.fs.dir", newFolder.getAbsolutePath());
        ActorSystem actorSystem = null;
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        ActorGateway actorGateway3 = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
            Tuple2 startJobManagerActors = JobManager.startJobManagerActors(configuration, actorSystem, actorSystem.dispatcher(), actorSystem.dispatcher(), Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            actorGateway = new AkkaActorGateway((ActorRef) startJobManagerActors._1(), (UUID) null);
            actorGateway2 = new AkkaActorGateway((ActorRef) startJobManagerActors._2(), (UUID) null);
            actorGateway3 = new AkkaActorGateway(TaskManager.startTaskManagerComponentsAndActor(new Configuration(), ResourceID.generate(), actorSystem, "localhost", Option.apply("tm"), Option.apply(new StandaloneLeaderRetrievalService(actorGateway.path())), true, TestingTaskManager.class), (UUID) null);
            Await.ready(actorGateway3.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(actorGateway.actor()), finiteDuration), finiteDuration);
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{jobVertex});
            jobGraph.setSnapshotSettings(new JobSnapshottingSettings(Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE));
            Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Await.result(actorGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), finiteDuration), finiteDuration);
            Assert.assertTrue("Did not trigger savepoint", Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID()), finiteDuration), finiteDuration) instanceof JobManagerMessages.TriggerSavepointSuccess);
            Assert.assertEquals(1L, newFolder.listFiles().length);
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
        } catch (Throwable th) {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            throw th;
        }
    }

    @Test
    public void testSavepointRestoreSettings() throws Exception {
        FiniteDuration finiteDuration = new FiniteDuration(30L, TimeUnit.SECONDS);
        ActorSystem actorSystem = null;
        ActorGateway actorGateway = null;
        ActorGateway actorGateway2 = null;
        ActorGateway actorGateway3 = null;
        try {
            actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
            Tuple2 startJobManagerActors = JobManager.startJobManagerActors(new Configuration(), actorSystem, actorSystem.dispatcher(), actorSystem.dispatcher(), Option.apply("jm"), Option.apply("arch"), TestingJobManager.class, TestingMemoryArchivist.class);
            actorGateway = new AkkaActorGateway((ActorRef) startJobManagerActors._1(), (UUID) null);
            actorGateway2 = new AkkaActorGateway((ActorRef) startJobManagerActors._2(), (UUID) null);
            Configuration configuration = new Configuration();
            configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
            actorGateway3 = new AkkaActorGateway(TaskManager.startTaskManagerComponentsAndActor(configuration, ResourceID.generate(), actorSystem, "localhost", Option.apply("tm"), Option.apply(new StandaloneLeaderRetrievalService(actorGateway.path())), true, TestingTaskManager.class), (UUID) null);
            Await.ready(actorGateway3.ask(new TestingTaskManagerMessages.NotifyWhenRegisteredAtJobManager(actorGateway.actor()), finiteDuration), finiteDuration);
            JobVertex jobVertex = new JobVertex("Source");
            jobVertex.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex.setParallelism(1);
            JobGraph jobGraph = new JobGraph("TestingJob", new JobVertex[]{jobVertex});
            jobGraph.setSnapshotSettings(new JobSnapshottingSettings(Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Collections.singletonList(jobVertex.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE));
            Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Await.result(actorGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), finiteDuration), finiteDuration);
            String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) Await.result(actorGateway.ask(new JobManagerMessages.TriggerSavepoint(jobGraph.getJobID()), finiteDuration), finiteDuration)).savepointPath();
            Future ask = actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), finiteDuration);
            Object result = Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobGraph.getJobID()), finiteDuration), finiteDuration);
            Assert.assertTrue("Unexpected response: " + result, result instanceof JobManagerMessages.CancellationSuccess);
            Await.ready(ask, finiteDuration);
            JobVertex jobVertex2 = new JobVertex("Source");
            jobVertex2.setInvokableClass(JobManagerHARecoveryTest.BlockingStatefulInvokable.class);
            jobVertex2.setParallelism(1);
            JobGraph jobGraph2 = new JobGraph("TestingJob", new JobVertex[]{jobVertex2});
            jobGraph2.setSnapshotSettings(new JobSnapshottingSettings(Collections.singletonList(jobVertex2.getID()), Collections.singletonList(jobVertex2.getID()), Collections.singletonList(jobVertex2.getID()), Long.MAX_VALUE, 360000L, 0L, Integer.MAX_VALUE));
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, false));
            Object result2 = Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph2, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Assert.assertTrue("Unexpected response: " + result2, result2 instanceof JobManagerMessages.JobResultFailure);
            Throwable deserializeError = ((JobManagerMessages.JobResultFailure) result2).cause().deserializeError(ClassLoader.getSystemClassLoader());
            Assert.assertTrue(deserializeError instanceof IllegalStateException);
            Assert.assertTrue(deserializeError.getMessage().contains("allowNonRestoredState"));
            Await.ready(actorGateway.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph2.getJobID()), finiteDuration), finiteDuration);
            jobGraph2.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath, true));
            Object result3 = Await.result(actorGateway.ask(new JobManagerMessages.SubmitJob(jobGraph2, ListeningBehaviour.DETACHED), finiteDuration), finiteDuration);
            Assert.assertTrue("Unexpected response: " + result3, result3 instanceof JobManagerMessages.JobSubmitSuccess);
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
        } catch (Throwable th) {
            if (actorSystem != null) {
                actorSystem.shutdown();
            }
            if (actorGateway2 != null) {
                actorGateway2.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway != null) {
                actorGateway.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            if (actorGateway3 != null) {
                actorGateway3.actor().tell(PoisonPill.getInstance(), ActorRef.noSender());
            }
            throw th;
        }
    }
}
