package org.apache.flink.test.cancelling;

import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/cancelling/CancelingTestBase.class */
public abstract class CancelingTestBase extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(CancelingTestBase.class);
    private static final int MINIMUM_HEAP_SIZE_MB = 192;
    private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10000;
    private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
    protected LocalFlinkMiniCluster executor;
    protected int taskManagerNumSlots = DEFAULT_TASK_MANAGER_NUM_SLOTS;

    private void verifyJvmOptions() {
        long maxMemory = Runtime.getRuntime().maxMemory() >> 20;
        Assert.assertTrue("Insufficient java heap space " + maxMemory + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + "m", maxMemory > 142);
    }

    @Before
    public void startCluster() throws Exception {
        verifyJvmOptions();
        Configuration configuration = new Configuration();
        configuration.setBoolean("fs.overwrite-files", true);
        configuration.setInteger("local.number-taskmanager", 2);
        configuration.setInteger("taskmanager.numberOfTaskSlots", 4);
        configuration.setString("akka.ask.timeout", TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        configuration.setInteger("taskmanager.memory.segment-size", 4096);
        configuration.setInteger("taskmanager.network.numberOfBuffers", 2048);
        this.executor = new LocalFlinkMiniCluster(configuration, false);
        this.executor.start();
    }

    @After
    public void stopCluster() throws Exception {
        if (this.executor != null) {
            this.executor.stop();
            this.executor = null;
            FileSystem.closeAll();
            System.gc();
        }
    }

    public void runAndCancelJob(Plan plan, int i) throws Exception {
        runAndCancelJob(plan, i, DEFAULT_CANCEL_FINISHED_INTERVAL);
    }

    public void runAndCancelJob(Plan plan, int i, int i2) throws Exception {
        try {
            JobGraph jobGraph = getJobGraph(plan);
            this.executor.submitJobDetached(jobGraph);
            JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.RUNNING, this.executor.getLeaderGateway(TestingUtils.TESTING_DURATION()), TestingUtils.TESTING_DURATION());
            Thread.sleep(i);
            FiniteDuration finiteDuration = new FiniteDuration(i2, TimeUnit.MILLISECONDS);
            Object result = Await.result(this.executor.getLeaderGateway(TestingUtils.TESTING_DURATION()).ask(new JobManagerMessages.CancelJob(jobGraph.getJobID()), finiteDuration), finiteDuration);
            if (result instanceof JobManagerMessages.CancellationSuccess) {
                JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED, this.executor.getLeaderGateway(TestingUtils.TESTING_DURATION()), TestingUtils.TESTING_DURATION());
            } else {
                if (!(result instanceof JobManagerMessages.CancellationFailure)) {
                    throw new Exception("Unexpected response to cancel request: " + result);
                }
                JobManagerMessages.CancellationFailure cancellationFailure = (JobManagerMessages.CancellationFailure) result;
                throw new Exception("Failed to cancel job with ID " + cancellationFailure.jobID() + ".", cancellationFailure.cause());
            }
        } catch (Exception e) {
            LOG.error("Exception found in runAndCancelJob.", e);
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    private JobGraph getJobGraph(Plan plan) throws Exception {
        return new JobGraphGenerator().compileJobGraph(new Optimizer(new DataStatistics(), this.executor.configuration()).compile(plan));
    }

    public void setTaskManagerNumSlots(int i) {
        this.taskManagerNumSlots = i;
    }

    public int getTaskManagerNumSlots() {
        return this.taskManagerNumSlots;
    }
}
