package org.apache.flink.test.cancelling;

import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/cancelling/CancelingTestBase.class */
public abstract class CancelingTestBase extends TestLogger {
    private static final int MINIMUM_HEAP_SIZE_MB = 192;
    protected static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterResource CLUSTER = new MiniClusterResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(getConfiguration()).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(PARALLELISM).build());

    private static void verifyJvmOptions() {
        long maxMemory = Runtime.getRuntime().maxMemory() >> 20;
        Assert.assertTrue("Insufficient java heap space " + maxMemory + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + "m", maxMemory > 142);
    }

    private static Configuration getConfiguration() {
        verifyJvmOptions();
        Configuration configuration = new Configuration();
        configuration.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
        configuration.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
        configuration.setString(TaskManagerOptions.MEMORY_SEGMENT_SIZE, "4096");
        configuration.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 2048);
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runAndCancelJob(Plan plan, int i, int i2) throws Exception {
        JobStatus jobStatus;
        JobStatus jobStatus2;
        JobGraph jobGraph = getJobGraph(plan);
        ClusterClient clusterClient = CLUSTER.getClusterClient();
        clusterClient.setDetached(true);
        JobSubmissionResult submitJob = clusterClient.submitJob(jobGraph, CancelingTestBase.class.getClassLoader());
        Deadline fromNow = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        Object obj = clusterClient.getJobStatus(submitJob.getJobID()).get(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        while (true) {
            jobStatus = (JobStatus) obj;
            if (jobStatus == JobStatus.RUNNING || !fromNow.hasTimeLeft()) {
                break;
            }
            Thread.sleep(50L);
            obj = clusterClient.getJobStatus(submitJob.getJobID()).get(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (jobStatus != JobStatus.RUNNING) {
            Assert.fail("Job not in state RUNNING.");
        }
        Thread.sleep(i);
        clusterClient.cancel(submitJob.getJobID());
        Deadline fromNow2 = new FiniteDuration(i2, TimeUnit.MILLISECONDS).fromNow();
        Object obj2 = clusterClient.getJobStatus(submitJob.getJobID()).get(fromNow2.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        while (true) {
            jobStatus2 = (JobStatus) obj2;
            if (jobStatus2 == JobStatus.CANCELED || !fromNow2.hasTimeLeft()) {
                break;
            }
            Thread.sleep(50L);
            obj2 = clusterClient.getJobStatus(submitJob.getJobID()).get(fromNow2.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
        }
        if (jobStatus2 != JobStatus.CANCELED) {
            Assert.fail("Failed to cancel job with ID " + submitJob.getJobID() + '.');
        }
    }

    private JobGraph getJobGraph(Plan plan) {
        return new JobGraphGenerator().compileJobGraph(new Optimizer(new DataStatistics(), getConfiguration()).compile(plan));
    }
}
