package org.apache.flink.runtime.taskmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.types.IntValue;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelTest.class */
public class TaskCancelTest {

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelTest$AgnosticUnion.class */
    public static class AgnosticUnion extends AbstractInvokable {
        public void invoke() throws Exception {
            do {
            } while (new RecordReader(new UnionInputGate(getEnvironment().getAllInputGates()), IntValue.class).next() != null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskCancelTest$InfiniteSource.class */
    public static class InfiniteSource extends AbstractInvokable {
        public void invoke() throws Exception {
            RecordWriter recordWriter = new RecordWriter(getEnvironment().getWriter(0));
            IntValue intValue = new IntValue();
            int i = 0;
            while (!Thread.interrupted()) {
                try {
                    intValue.setValue(i);
                    recordWriter.emit(intValue);
                    i++;
                } finally {
                    recordWriter.clearBuffers();
                }
            }
        }
    }

    @Test
    public void testCancelUnion() throws Exception {
        TestingCluster testingCluster = null;
        try {
            Configuration configuration = new Configuration();
            configuration.setInteger("local.number-taskmanager", 2);
            configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
            configuration.setString("akka.ask.timeout", TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
            configuration.setInteger("taskmanager.memory.segment-size", 4096);
            configuration.setInteger("taskmanager.network.numberOfBuffers", 2048);
            testingCluster = new TestingCluster(configuration, false);
            testingCluster.start();
            JobGraph jobGraph = new JobGraph("Cancel Big Union");
            JobVertex[] jobVertexArr = new JobVertex[8];
            SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
            for (int i = 0; i < jobVertexArr.length; i++) {
                jobVertexArr[i] = new JobVertex("Source " + i);
                jobVertexArr[i].setInvokableClass(InfiniteSource.class);
                jobVertexArr[i].setParallelism(4);
                jobVertexArr[i].setSlotSharingGroup(slotSharingGroup);
                jobGraph.addVertex(jobVertexArr[i]);
                slotSharingGroup.addVertexToGroup(jobVertexArr[i].getID());
            }
            JobVertex jobVertex = new JobVertex("Union");
            jobVertex.setInvokableClass(AgnosticUnion.class);
            jobVertex.setParallelism(4);
            jobGraph.addVertex(jobVertex);
            for (JobVertex jobVertex2 : jobVertexArr) {
                jobVertex.connectNewDataSetAsInput(jobVertex2, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            }
            testingCluster.submitJobDetached(jobGraph);
            awaitRunning(testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION()), jobGraph.getJobID(), TestingUtils.TESTING_DURATION());
            Thread.sleep(5000L);
            cancelJob(testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION()), jobGraph.getJobID(), TestingUtils.TESTING_DURATION());
            JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED, testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION()), TestingUtils.TESTING_DURATION());
            if (testingCluster != null) {
                testingCluster.shutdown();
            }
        } catch (Throwable th) {
            if (testingCluster != null) {
                testingCluster.shutdown();
            }
            throw th;
        }
    }

    public static void cancelJob(ActorGateway actorGateway, JobID jobID, FiniteDuration finiteDuration) throws Exception {
        Preconditions.checkNotNull(actorGateway);
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(finiteDuration);
        Object result = Await.result(actorGateway.ask(new JobManagerMessages.CancelJob(jobID), finiteDuration), finiteDuration);
        if (result instanceof JobManagerMessages.CancellationSuccess) {
            JobManagerMessages.CancellationSuccess cancellationSuccess = (JobManagerMessages.CancellationSuccess) result;
            if (!cancellationSuccess.jobID().equals(jobID)) {
                throw new Exception("JobManager responded for wrong job ID. Request: " + jobID + ", response: " + cancellationSuccess.jobID() + ".");
            }
        } else {
            if (!(result instanceof JobManagerMessages.CancellationFailure)) {
                throw new Exception("Unexpected response to cancel request: " + result);
            }
            JobManagerMessages.CancellationFailure cancellationFailure = (JobManagerMessages.CancellationFailure) result;
            throw new Exception("Failed to cancel job with ID " + cancellationFailure.jobID() + ".", cancellationFailure.cause());
        }
    }

    public static void awaitRunning(ActorGateway actorGateway, JobID jobID, FiniteDuration finiteDuration) throws Exception {
        JobManagerMessages.CurrentJobStatus currentJobStatus;
        Preconditions.checkNotNull(actorGateway);
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(finiteDuration);
        do {
            Object result = Await.result(actorGateway.ask(new JobManagerMessages.RequestJobStatus(jobID), finiteDuration), finiteDuration);
            if (!(result instanceof JobManagerMessages.CurrentJobStatus)) {
                if (!(result instanceof JobManagerMessages.JobNotFound)) {
                    throw new Exception("Unexpected response to cancel request: " + result);
                }
                throw new Exception("Cannot find job with ID " + jobID + ".");
            }
            currentJobStatus = (JobManagerMessages.CurrentJobStatus) result;
            if (!currentJobStatus.jobID().equals(jobID)) {
                throw new Exception("JobManager responded for wrong job ID. Request: " + jobID + ", response: " + currentJobStatus.jobID() + ".");
            }
            if (currentJobStatus.status() == JobStatus.RUNNING) {
                return;
            }
        } while (!currentJobStatus.status().isTerminalState());
        throw new Exception("JobStatus changed to " + currentJobStatus.status() + " while waiting for job to start running.");
    }
}
