package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.DummyActorGateway;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.Messages$Acknowledge$;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest.class */
public class ExecutionVertexCancelTest {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexCancelTest$CancelSequenceActorGateway.class */
    public static class CancelSequenceActorGateway extends BaseTestingActorGateway {
        private final TaskMessages.TaskOperationResult[] results;
        private int index;

        public CancelSequenceActorGateway(ExecutionContext executionContext, TaskMessages.TaskOperationResult... taskOperationResultArr) {
            super(executionContext);
            this.index = -1;
            this.results = taskOperationResultArr;
        }

        @Override // org.apache.flink.runtime.instance.BaseTestingActorGateway
        public Object handleMessage(Object obj) throws Exception {
            Messages$Acknowledge$ messages$Acknowledge$;
            if (obj instanceof TaskMessages.SubmitTask) {
                messages$Acknowledge$ = Messages.getAcknowledge();
            } else if (obj instanceof TaskMessages.CancelTask) {
                this.index++;
                if (this.index >= this.results.length) {
                    throw new IOException("RPC call failed.");
                }
                messages$Acknowledge$ = this.results[this.index];
            } else {
                messages$Acknowledge$ = null;
            }
            return messages$Acknowledge$;
        }
    }

    @Test
    public void testCancelFromCreated() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelFromScheduled() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
            Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelConcurrentlyToDeploying_CallsNotOvertaking() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            TestingUtils.QueuedActionExecutionContext queuedActionExecutionContext = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue actionQueue = queuedActionExecutionContext.actionQueue();
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(jobVertexID, queuedActionExecutionContext), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
            Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new CancelSequenceActorGateway(queuedActionExecutionContext, new TaskMessages.TaskOperationResult(attemptId, true), new TaskMessages.TaskOperationResult(attemptId, false))).allocateSimpleSlot(new JobID());
            executionVertex.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            actionQueue.triggerNextAction();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            actionQueue.triggerNextAction();
            executionVertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            actionQueue.triggerNextAction();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertTrue(allocateSimpleSlot.isReleased());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelConcurrentlyToDeploying_CallsOvertaking() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            TestingUtils.QueuedActionExecutionContext queuedActionExecutionContext = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue actionQueue = queuedActionExecutionContext.actionQueue();
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(jobVertexID, queuedActionExecutionContext), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
            Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new CancelSequenceActorGateway(queuedActionExecutionContext, new TaskMessages.TaskOperationResult(attemptId, false), new TaskMessages.TaskOperationResult(attemptId, true))).allocateSimpleSlot(new JobID());
            executionVertex.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            Runnable popNextAction = actionQueue.popNextAction();
            actionQueue.popNextAction().run();
            actionQueue.triggerNextAction();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            popNextAction.run();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertTrue(allocateSimpleSlot.isReleased());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelFromRunning() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), TestingUtils.directExecutionContext()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), true))).allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
            executionVertex.cancel();
            executionVertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertTrue(allocateSimpleSlot.isReleased());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRepeatedCancelFromRunning() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), TestingUtils.directExecutionContext()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), true))).allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            executionVertex.getCurrentExecutionAttempt().cancelingComplete();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertTrue(allocateSimpleSlot.isReleased());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelFromRunningDidNotFindTask() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), TestingUtils.directExecutionContext()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), false))).allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getExecutionState());
            Assert.assertNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testCancelCallFails() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), TestingUtils.directExecutionContext()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new CancelSequenceActorGateway(TestingUtils.directExecutionContext(), new TaskMessages.TaskOperationResult[0])).allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            Assert.assertTrue(allocateSimpleSlot.isReleased());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CANCELING) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testSendCancelAndReceiveFail() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new CancelSequenceActorGateway(TestingUtils.defaultExecutionContext(), new TaskMessages.TaskOperationResult(executionVertex.getCurrentExecutionAttempt().getAttemptId(), true))).allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.RUNNING);
            ExecutionGraphTestUtils.setVertexResource(executionVertex, allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getExecutionState());
            executionVertex.cancel();
            Assert.assertTrue(executionVertex.getExecutionState() == ExecutionState.CANCELING || executionVertex.getExecutionState() == ExecutionState.FAILED);
            executionVertex.getCurrentExecutionAttempt().markFailed(new Throwable("test"));
            Assert.assertTrue(executionVertex.getExecutionState() == ExecutionState.CANCELED || executionVertex.getExecutionState() == ExecutionState.FAILED);
            Assert.assertTrue(allocateSimpleSlot.isReleased());
            Assert.assertEquals(0L, executionVertex.getExecutionGraph().getRegisteredExecutions().size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testScheduleOrDeployAfterCancel() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.CANCELED);
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            executionVertex.scheduleForExecution((Scheduler) Mockito.mock(Scheduler.class), false);
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            try {
                executionVertex.deployToSlot(ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE).allocateSimpleSlot(new JobID()));
                Assert.fail("Method should throw an exception");
            } catch (IllegalStateException e) {
                Assert.assertEquals(ExecutionState.CANCELED, executionVertex.getExecutionState());
            }
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testActionsWhileCancelling() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            try {
                ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
                ExecutionGraphTestUtils.setVertexState(executionVertex2, ExecutionState.CANCELING);
                executionVertex2.scheduleForExecution((Scheduler) Mockito.mock(Scheduler.class), false);
            } catch (Exception e) {
                Assert.fail("should not throw an exception");
            }
            try {
                ExecutionVertex executionVertex3 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
                ExecutionGraphTestUtils.setVertexState(executionVertex3, ExecutionState.CANCELING);
                executionVertex3.deployToSlot(ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE).allocateSimpleSlot(new JobID()));
                Assert.fail("Method should throw an exception");
            } catch (IllegalStateException e2) {
            }
            ExecutionVertex executionVertex4 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(DummyActorGateway.INSTANCE).allocateSimpleSlot(new JobID());
            ExecutionGraphTestUtils.setVertexResource(executionVertex4, allocateSimpleSlot);
            ExecutionGraphTestUtils.setVertexState(executionVertex4, ExecutionState.CANCELING);
            executionVertex4.fail(new Exception("test exception"));
            Assert.assertEquals(ExecutionState.CANCELED, executionVertex4.getExecutionState());
            Assert.assertTrue(allocateSimpleSlot.isReleased());
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }
}
