package org.apache.flink.test.cancelling;

import java.time.Duration;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.Plan;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.ClassRule;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/test/cancelling/CancelingTestBase.class */
public abstract class CancelingTestBase extends TestLogger {
    private static final int MINIMUM_HEAP_SIZE_MB = 192;
    private static final Configuration configuration = getConfiguration();
    protected static final int PARALLELISM = 4;

    @ClassRule
    public static final MiniClusterWithClientResource CLUSTER = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(PARALLELISM).build());

    private static void verifyJvmOptions() {
        long maxMemory = Runtime.getRuntime().maxMemory() >> 20;
        Assert.assertTrue("Insufficient java heap space " + maxMemory + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + "m", maxMemory > 142);
    }

    private static Configuration getConfiguration() {
        verifyJvmOptions();
        Configuration configuration2 = new Configuration();
        configuration2.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
        configuration2.set(AkkaOptions.ASK_TIMEOUT_DURATION, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT);
        configuration2.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.parse("4096"));
        configuration2.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 2048);
        return configuration2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void runAndCancelJob(Plan plan, int i, int i2) throws Exception {
        JobStatus jobStatus;
        JobStatus jobStatus2;
        JobGraph jobGraph = getJobGraph(plan);
        long millis = ((Duration) configuration.get(AkkaOptions.ASK_TIMEOUT_DURATION)).toMillis();
        ClusterClient clusterClient = CLUSTER.getClusterClient();
        JobID jobID = (JobID) clusterClient.submitJob(jobGraph).get();
        Deadline fromNow = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        Object obj = clusterClient.getJobStatus(jobID).get(millis, TimeUnit.MILLISECONDS);
        while (true) {
            jobStatus = (JobStatus) obj;
            if (jobStatus == JobStatus.RUNNING || !fromNow.hasTimeLeft()) {
                break;
            }
            Thread.sleep(50L);
            obj = clusterClient.getJobStatus(jobID).get(millis, TimeUnit.MILLISECONDS);
        }
        if (jobStatus != JobStatus.RUNNING) {
            Assert.fail("Job not in state RUNNING.");
        }
        Thread.sleep(i);
        clusterClient.cancel(jobID).get();
        Deadline fromNow2 = new FiniteDuration(i2, TimeUnit.MILLISECONDS).fromNow();
        Object obj2 = clusterClient.getJobStatus(jobID).get(millis, TimeUnit.MILLISECONDS);
        while (true) {
            jobStatus2 = (JobStatus) obj2;
            if (jobStatus2 == JobStatus.CANCELED || !fromNow2.hasTimeLeft()) {
                break;
            }
            Thread.sleep(50L);
            obj2 = clusterClient.getJobStatus(jobID).get(millis, TimeUnit.MILLISECONDS);
        }
        if (jobStatus2 != JobStatus.CANCELED) {
            Assert.fail("Failed to cancel job with ID " + jobID + '.');
        }
    }

    private JobGraph getJobGraph(Plan plan) {
        return new JobGraphGenerator().compileJobGraph(new Optimizer(new DataStatistics(), getConfiguration()).compile(plan));
    }
}
