package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.messages.Messages$Acknowledge$;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import scala.concurrent.ExecutionContext;

@PrepareForTest({ExecutionVertex.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest.class */
public class ExecutionVertexStopTest extends TestLogger {
    private static ActorSystem system;
    private static boolean receivedStopSignal;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionVertexStopTest$StopSequenceInstanceGateway.class */
    public static class StopSequenceInstanceGateway extends BaseTestingActorGateway {
        private static final long serialVersionUID = 7611571264006653627L;
        private final TaskMessages.TaskOperationResult result;

        public StopSequenceInstanceGateway(ExecutionContext executionContext, TaskMessages.TaskOperationResult taskOperationResult) {
            super(executionContext);
            this.result = taskOperationResult;
        }

        @Override // org.apache.flink.runtime.instance.BaseTestingActorGateway
        public Object handleMessage(Object obj) throws Exception {
            Messages$Acknowledge$ messages$Acknowledge$ = null;
            if (obj instanceof TaskMessages.SubmitTask) {
                messages$Acknowledge$ = Messages.getAcknowledge();
            } else if (obj instanceof TaskMessages.StopTask) {
                messages$Acknowledge$ = this.result;
                boolean unused = ExecutionVertexStopTest.receivedStopSignal = true;
            }
            return messages$Acknowledge$;
        }
    }

    @AfterClass
    public static void teardown() {
        if (system != null) {
            JavaTestKit.shutdownActorSystem(system);
            system = null;
        }
    }

    @Test
    public void testStop() throws Exception {
        ExecutionJobVertex executionVertex = ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID());
        Execution execution = (Execution) Mockito.mock(Execution.class);
        PowerMockito.whenNew(Execution.class).withAnyArguments().thenReturn(execution);
        new ExecutionVertex(executionVertex, 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout()).stop();
        ((Execution) Mockito.verify(execution)).stop();
    }

    @Test
    public void testStopRpc() throws Exception {
        ExecutionVertex executionVertex = new ExecutionVertex(ExecutionGraphTestUtils.getExecutionVertex(new JobVertexID()), 0, new IntermediateResult[0], AkkaUtils.getDefaultTimeout());
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ExecutionGraphTestUtils.setVertexState(executionVertex, ExecutionState.SCHEDULED);
        Assert.assertEquals(ExecutionState.SCHEDULED, executionVertex.getExecutionState());
        executionVertex.deployToSlot(ExecutionGraphTestUtils.getInstance(new StopSequenceInstanceGateway(TestingUtils.defaultExecutionContext(), new TaskMessages.TaskOperationResult(attemptId, true))).allocateSimpleSlot(new JobID()));
        receivedStopSignal = false;
        executionVertex.stop();
        Assert.assertTrue(receivedStopSignal);
    }
}
