package org.apache.flink.runtime.executiongraph;

import java.util.Collection;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionVertexCancelTest;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.class */
public class ExecutionVertexDeploymentTest extends TestLogger {
    @Test
    public void testDeployCall() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()))).allocateSimpleSlot(executionVertex.getJobId());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDeployWithSynchronousAnswer() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()))).allocateSimpleSlot(executionVertex.getJobId());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDeployWithAsynchronousAnswer() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.defaultExecutionContext()))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(allocateSimpleSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e2) {
            }
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testDeployFailedSynchronous() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleFailingActorGateway(TestingUtils.directExecutionContext()))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Assert.assertNotNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getFailureCause().getMessage().contains(ExecutionGraphTestUtils.ERROR_MESSAGE));
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testDeployFailedAsynchronously() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleFailingActorGateway(TestingUtils.directExecutionContext()))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            for (int i = 0; i < 100 && (executionVertex2.getExecutionState() != ExecutionState.FAILED || executionVertex2.getFailureCause() == null); i++) {
                Thread.sleep(10L);
            }
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Assert.assertNotNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getFailureCause().getMessage().contains(ExecutionGraphTestUtils.ERROR_MESSAGE));
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringDeploy() {
        try {
            JobVertexID jobVertexID = new JobVertexID();
            TestingUtils.QueuedActionExecutionContext queuedActionExecutionContext = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue actionQueue = queuedActionExecutionContext.actionQueue();
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(jobVertexID, queuedActionExecutionContext);
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            Exception exc = new Exception("test error");
            executionVertex2.fail(exc);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Assert.assertEquals(exc, executionVertex2.getFailureCause());
            actionQueue.triggerNextAction();
            actionQueue.triggerNextAction();
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailCallOvertakesDeploymentAnswer() {
        try {
            TestingUtils.QueuedActionExecutionContext queuedActionExecutionContext = TestingUtils.queuedActionExecutionContext();
            TestingUtils.ActionQueue actionQueue = queuedActionExecutionContext.actionQueue();
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), queuedActionExecutionContext);
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            executionVertex2.getCurrentExecutionAttempt().getAttemptId();
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionVertexCancelTest.CancelSequenceActorGateway(queuedActionExecutionContext, 2))).allocateSimpleSlot(executionVertex.getJobId());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            Exception exc = new Exception("test error");
            executionVertex2.fail(exc);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Runnable popNextAction = actionQueue.popNextAction();
            actionQueue.popNextAction().run();
            actionQueue.triggerNextAction();
            actionQueue.triggerNextAction();
            popNextAction.run();
            actionQueue.triggerNextAction();
            Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getExecutionState());
            Assert.assertEquals(exc, executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.FAILED) > 0);
            Assert.assertTrue(actionQueue.isEmpty());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTddProducedPartitionsLazyScheduling() throws Exception {
        ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), TestingUtils.queuedActionExecutionContext());
        IntermediateResult intermediateResult = new IntermediateResult(new IntermediateDataSetID(), executionVertex, 1, ResultPartitionType.PIPELINED);
        ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[]{intermediateResult}, Time.minutes(1L));
        ExecutionEdge createMockExecutionEdge = createMockExecutionEdge(1);
        intermediateResult.getPartitions()[0].addConsumerGroup();
        intermediateResult.getPartitions()[0].addConsumer(createMockExecutionEdge, 0);
        AllocatedSlot allocatedSlot = (AllocatedSlot) Mockito.mock(AllocatedSlot.class);
        Mockito.when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
        Slot slot = (Slot) Mockito.mock(Slot.class);
        Mockito.when(Integer.valueOf(slot.getSlotNumber())).thenReturn(1);
        SimpleSlot simpleSlot = (SimpleSlot) Mockito.mock(SimpleSlot.class);
        Mockito.when(simpleSlot.getRoot()).thenReturn(slot);
        Mockito.when(simpleSlot.getAllocatedSlot()).thenReturn(allocatedSlot);
        Mockito.when(slot.getAllocatedSlot()).thenReturn(allocatedSlot);
        for (ScheduleMode scheduleMode : ScheduleMode.values()) {
            executionVertex2.getExecutionGraph().setScheduleMode(scheduleMode);
            Collection producedPartitions = executionVertex2.createDeploymentDescriptor(new ExecutionAttemptID(), simpleSlot, (TaskStateSnapshot) null, 1).getProducedPartitions();
            Assert.assertEquals(1L, producedPartitions.size());
            Assert.assertEquals(Boolean.valueOf(scheduleMode.allowLazyDeployment()), Boolean.valueOf(((ResultPartitionDeploymentDescriptor) producedPartitions.iterator().next()).sendScheduleOrUpdateConsumersMessage()));
        }
    }

    private ExecutionEdge createMockExecutionEdge(int i) {
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Mockito.mock(ExecutionJobVertex.class);
        Mockito.when(executionVertex.getJobVertex()).thenReturn(executionJobVertex);
        Mockito.when(Integer.valueOf(executionJobVertex.getMaxParallelism())).thenReturn(Integer.valueOf(i));
        ExecutionEdge executionEdge = (ExecutionEdge) Mockito.mock(ExecutionEdge.class);
        Mockito.when(executionEdge.getTarget()).thenReturn(executionVertex);
        return executionEdge;
    }
}
