package org.apache.flink.runtime.executiongraph;

import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.StoppableInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphStopTest.class */
public class ExecutionGraphStopTest extends TestLogger {
    @Test
    public void testStopIfSourcesNotStoppable() throws Exception {
        try {
            ExecutionGraphTestUtils.createSimpleTestGraph().stop();
            Assert.fail("exception expected");
        } catch (StoppingException e) {
        }
    }

    @Test
    public void testStop() throws Exception {
        JobVertex jobVertex = new JobVertex("source 1");
        jobVertex.setInvokableClass(StoppableInvokable.class);
        jobVertex.setParallelism(11);
        JobVertex jobVertex2 = new JobVertex("source 2");
        jobVertex2.setInvokableClass(StoppableInvokable.class);
        jobVertex2.setParallelism(7);
        JobVertex jobVertex3 = new JobVertex("non-source-1");
        jobVertex3.setInvokableClass(NoOpInvokable.class);
        jobVertex3.setParallelism(10);
        JobVertex jobVertex4 = new JobVertex("non-source-2");
        jobVertex4.setInvokableClass(NoOpInvokable.class);
        jobVertex4.setParallelism(10);
        jobVertex3.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
        jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        jobVertex4.connectNewDataSetAsInput(jobVertex3, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        JobID jobID = new JobID();
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(jobID, jobVertex, jobVertex2, jobVertex3, jobVertex4);
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        TaskManagerGateway taskManagerGateway2 = (TaskManagerGateway) Mockito.spy(new SimpleAckingTaskManagerGateway());
        for (ExecutionVertex executionVertex : createSimpleTestGraph.getJobVertex(jobVertex.getID()).getTaskVertices()) {
            executionVertex.getCurrentExecutionAttempt().tryAssignResource(ExecutionGraphTestUtils.createMockSimpleSlot(jobID, taskManagerGateway));
            executionVertex.getCurrentExecutionAttempt().deploy();
        }
        for (ExecutionVertex executionVertex2 : createSimpleTestGraph.getJobVertex(jobVertex2.getID()).getTaskVertices()) {
            executionVertex2.getCurrentExecutionAttempt().tryAssignResource(ExecutionGraphTestUtils.createMockSimpleSlot(jobID, taskManagerGateway));
            executionVertex2.getCurrentExecutionAttempt().deploy();
        }
        for (ExecutionVertex executionVertex3 : createSimpleTestGraph.getJobVertex(jobVertex3.getID()).getTaskVertices()) {
            executionVertex3.getCurrentExecutionAttempt().tryAssignResource(ExecutionGraphTestUtils.createMockSimpleSlot(jobID, taskManagerGateway2));
            executionVertex3.getCurrentExecutionAttempt().deploy();
        }
        for (ExecutionVertex executionVertex4 : createSimpleTestGraph.getJobVertex(jobVertex4.getID()).getTaskVertices()) {
            executionVertex4.getCurrentExecutionAttempt().tryAssignResource(ExecutionGraphTestUtils.createMockSimpleSlot(jobID, taskManagerGateway2));
            executionVertex4.getCurrentExecutionAttempt().deploy();
        }
        createSimpleTestGraph.stop();
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.timeout(1000L).times(18))).stopTask((ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class), (Time) Mockito.any(Time.class));
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway2, Mockito.times(0))).stopTask((ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class), (Time) Mockito.any(Time.class));
        ExecutionGraphTestUtils.finishAllVertices(createSimpleTestGraph);
    }

    @Test
    public void testStopRpc() throws Exception {
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(5);
        Execution currentExecutionAttempt = ExecutionGraphTestUtils.createSimpleTestGraph(jobID, jobVertex).getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt();
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
        Mockito.when(taskManagerGateway.submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        Mockito.when(taskManagerGateway.stopTask((ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        currentExecutionAttempt.tryAssignResource(ExecutionGraphTestUtils.createMockSimpleSlot(jobID, taskManagerGateway));
        currentExecutionAttempt.deploy();
        currentExecutionAttempt.switchToRunning();
        Assert.assertEquals(ExecutionState.RUNNING, currentExecutionAttempt.getState());
        currentExecutionAttempt.stop();
        Assert.assertEquals(ExecutionState.RUNNING, currentExecutionAttempt.getState());
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.times(1))).stopTask((ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class), (Time) Mockito.any(Time.class));
        currentExecutionAttempt.markFinished();
        Assert.assertEquals(ExecutionState.FINISHED, currentExecutionAttempt.getState());
    }
}
