package org.apache.flink.runtime.executiongraph;

import java.net.InetAddress;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.class */
public class ExecutionVertexDeploymentTest extends TestLogger {
    private static final String ERROR_MESSAGE = "test_failure_error_message";

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest$SubmitBlockingSimpleAckingTaskManagerGateway.class */
    private static class SubmitBlockingSimpleAckingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
        private SubmitBlockingSimpleAckingTaskManagerGateway() {
        }

        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor, Time time) {
            return new CompletableFuture<>();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest$SubmitFailingSimpleAckingTaskManagerGateway.class */
    private static class SubmitFailingSimpleAckingTaskManagerGateway extends SimpleAckingTaskManagerGateway {
        private SubmitFailingSimpleAckingTaskManagerGateway() {
        }

        @Override // org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway
        public CompletableFuture<Acknowledge> submitTask(TaskDeploymentDescriptor taskDeploymentDescriptor, Time time) {
            CompletableFuture<Acknowledge> completableFuture = new CompletableFuture<>();
            completableFuture.completeExceptionally(new Exception(ExecutionVertexDeploymentTest.ERROR_MESSAGE));
            return completableFuture;
        }
    }

    @Test
    public void testDeployCall() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot();
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(testingLogicalSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(testingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDeployWithSynchronousAnswer() {
        try {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService());
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot();
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex2.getExecutionState());
            executionVertex2.deployToSlot(testingLogicalSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex2.getExecutionState());
            try {
                executionVertex2.deployToSlot(testingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertNull(executionVertex2.getFailureCause());
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex2.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testDeployWithAsynchronousAnswer() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot();
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.deployToSlot(testingLogicalSlot);
            try {
                executionVertex.deployToSlot(testingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e) {
            }
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            try {
                executionVertex.deployToSlot(testingLogicalSlot);
                Assert.fail("Scheduled from wrong state");
            } catch (IllegalStateException e2) {
            }
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.RUNNING) == 0);
        } catch (Exception e3) {
            e3.printStackTrace();
            Assert.fail(e3.getMessage());
        }
    }

    @Test
    public void testDeployFailedSynchronous() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot(new SubmitFailingSimpleAckingTaskManagerGateway());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.deployToSlot(testingLogicalSlot);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertNotNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testDeployFailedAsynchronously() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot(new SubmitFailingSimpleAckingTaskManagerGateway());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.deployToSlot(testingLogicalSlot);
            for (int i = 0; i < 100 && (executionVertex.getExecutionState() != ExecutionState.FAILED || executionVertex.getFailureCause() == null); i++) {
                Thread.sleep(10L);
            }
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertNotNull(executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getFailureCause().getMessage().contains(ERROR_MESSAGE));
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testFailExternallyDuringDeploy() {
        try {
            ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
            TestingLogicalSlot testingLogicalSlot = new TestingLogicalSlot(new SubmitBlockingSimpleAckingTaskManagerGateway());
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.deployToSlot(testingLogicalSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            Exception exc = new Exception("test error");
            executionVertex.fail(exc);
            Assert.assertEquals(ExecutionState.FAILED, executionVertex.getExecutionState());
            Assert.assertEquals(exc, executionVertex.getFailureCause());
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.CREATED) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.DEPLOYING) > 0);
            Assert.assertTrue(executionVertex.getStateTimestamp(ExecutionState.FAILED) > 0);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testTddProducedPartitionsLazyScheduling() throws Exception {
        for (ScheduleMode scheduleMode : ScheduleMode.values()) {
            ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID(), new DirectScheduledExecutorService(), scheduleMode);
            IntermediateResult intermediateResult = new IntermediateResult(new IntermediateDataSetID(), executionVertex, 1, ResultPartitionType.PIPELINED);
            ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
            ExecutionVertex executionVertex2 = new ExecutionVertex(executionVertex, 0, new IntermediateResult[]{intermediateResult}, Time.minutes(1L));
            TaskDeploymentDescriptorFactory fromExecutionVertex = TaskDeploymentDescriptorFactory.fromExecutionVertex(executionVertex2, 1);
            ExecutionEdge createMockExecutionEdge = createMockExecutionEdge(1);
            intermediateResult.getPartitions()[0].addConsumerGroup();
            intermediateResult.getPartitions()[0].addConsumer(createMockExecutionEdge, 0);
            Collection producedPartitions = fromExecutionVertex.createDeploymentDescriptor(new AllocationID(), 0, (JobManagerTaskRestore) null, ((Map) Execution.registerProducedPartitions(executionVertex2, new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 1), executionAttemptID).get()).values()).getProducedPartitions();
            Assert.assertEquals(1L, producedPartitions.size());
            Assert.assertEquals(Boolean.valueOf(scheduleMode.allowLazyDeployment()), Boolean.valueOf(((ResultPartitionDeploymentDescriptor) producedPartitions.iterator().next()).sendScheduleOrUpdateConsumersMessage()));
        }
    }

    private ExecutionEdge createMockExecutionEdge(int i) {
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Mockito.mock(ExecutionJobVertex.class);
        Mockito.when(executionVertex.getJobVertex()).thenReturn(executionJobVertex);
        Mockito.when(Integer.valueOf(executionJobVertex.getMaxParallelism())).thenReturn(Integer.valueOf(i));
        ExecutionEdge executionEdge = (ExecutionEdge) Mockito.mock(ExecutionEdge.class);
        Mockito.when(executionEdge.getTarget()).thenReturn(executionVertex);
        return executionEdge;
    }
}
