package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import akka.testkit.TestActorRef;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.class */
public class ExecutionVertexDeploymentTest {
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = ActorSystem.create("TestingActorSystem", TestingUtils.testConfig());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testDeployCall() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            TestingUtils.setCallingThreadDispatcher(system);
            TestActorRef create = TestActorRef.create(system, Props.create((Class<?>) ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class, new Object[0]));
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(jobVertexID);
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(create).allocateSimpleSlot(executionVertex.getJobId());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        } finally {
            TestingUtils.setGlobalExecutionContext();
        }
    }

    @Test
    public void testDeployWithSynchronousAnswer() {
        try {
            TestingUtils.setCallingThreadDispatcher(system);
            JobVertexID jobVertexID = new JobVertexID();
            TestActorRef create = TestActorRef.create(system, Props.create((Class<?>) ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class, new Object[0]));
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(jobVertexID);
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(create).allocateSimpleSlot(executionVertex.getJobId());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        } finally {
            TestingUtils.setGlobalExecutionContext();
        }
    }

    @Test
    public void testDeployWithAsynchronousAnswer() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(system, Props.create((Class<?>) ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class, new Object[0]))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e2) {
            }
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testDeployFailedSynchronous() {
        try {
            TestingUtils.setCallingThreadDispatcher(system);
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(system, Props.create((Class<?>) ExecutionGraphTestUtils.SimpleFailingTaskManager.class, new Object[0]))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Assert.assertNotNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getFailureCause().getMessage().contains(ExecutionGraphTestUtils.ERROR_MESSAGE));
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        } finally {
            TestingUtils.setGlobalExecutionContext();
        }
    }

    @Test
    public void testDeployFailedAsynchronously() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(system, Props.create((Class<?>) ExecutionGraphTestUtils.SimpleFailingTaskManager.class, new Object[0]))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            for (int i = 0; i < 100 && (executionVertex2.getExecutionState() != ExecutionState.FAILED || executionVertex2.getFailureCause() == null); i++) {
                Thread.sleep(10L);
            }
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Assert.assertNotNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getFailureCause().getMessage().contains(ExecutionGraphTestUtils.ERROR_MESSAGE));
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringDeploy() {
        try {
            try {
                ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
                ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
                ExecutionGraphTestUtils.ActionQueue actionQueue = new ExecutionGraphTestUtils.ActionQueue();
                TestingUtils.setExecutionContext(new TestingUtils.QueuedActionExecutionContext(actionQueue));
                SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(system, Props.create((Class<?>) ExecutionGraphTestUtils.SimpleAcknowledgingTaskManager.class, new Object[0]))).allocateSimpleSlot(executionVertex.getJobId());
                Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
                Exception exc = new Exception("test error");
                executionVertex2.fail(exc);
                Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
                Assert.assertEquals(exc, executionVertex2.getFailureCause());
                actionQueue.triggerNextAction();
                Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
                Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
                Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
                TestingUtils.setGlobalExecutionContext();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail(e.getMessage());
                TestingUtils.setGlobalExecutionContext();
            }
        } catch (Throwable th) {
            TestingUtils.setGlobalExecutionContext();
            throw th;
        }
    }

    @Test
    public void testFailCallOvertakesDeploymentAnswer() {
        try {
            ExecutionGraphTestUtils.ActionQueue actionQueue = new ExecutionGraphTestUtils.ActionQueue();
            TestingUtils.setExecutionContext(new TestingUtils.QueuedActionExecutionContext(actionQueue));
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID attemptId = executionVertex2.getCurrentExecutionAttempt().getAttemptId();
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(TestActorRef.create(system, Props.create(new ExecutionVertexCancelTest.CancelSequenceTaskManagerCreator(new TaskMessages.TaskOperationResult(attemptId, false), new TaskMessages.TaskOperationResult(attemptId, true))))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            Exception exc = new Exception("test error");
            executionVertex2.fail(exc);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Runnable popNextAction = actionQueue.popNextAction();
            actionQueue.popNextAction().run();
            actionQueue.triggerNextAction();
            popNextAction.run();
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Assert.assertEquals(exc, executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
            Assert.assertTrue(actionQueue.isEmpty());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        } finally {
            TestingUtils.setGlobalExecutionContext();
        }
    }
}
