/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.cancelling;

import java.util.concurrent.atomic.AtomicReference;
import org.apache.flink.api.common.Plan;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.apache.hadoop.fs.FileSystem;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class CancellingTestBase
extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(CancellingTestBase.class);
    private static final int MINIMUM_HEAP_SIZE_MB = 192;
    private static final int DEFAULT_CANCEL_FINISHED_INTERVAL = 10000;
    private static final int DEFAULT_TASK_MANAGER_NUM_SLOTS = 1;
    protected ForkableFlinkMiniCluster executor;
    protected int taskManagerNumSlots = 1;

    private void verifyJvmOptions() {
        long heap = Runtime.getRuntime().maxMemory() >> 20;
        Assert.assertTrue((String)("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + 192 + "m"), (heap > 142L ? 1 : 0) != 0);
    }

    @Before
    public void startCluster() throws Exception {
        this.verifyJvmOptions();
        Configuration config = new Configuration();
        config.setBoolean("fs.overwrite-files", true);
        this.executor = new ForkableFlinkMiniCluster(config);
    }

    @After
    public void stopCluster() throws Exception {
        if (this.executor != null) {
            this.executor.stop();
            this.executor = null;
            FileSystem.closeAll();
            System.gc();
        }
    }

    public void runAndCancelJob(Plan plan, int msecsTillCanceling) throws Exception {
        this.runAndCancelJob(plan, msecsTillCanceling, 10000);
    }

    public void runAndCancelJob(Plan plan, final int msecsTillCanceling, int maxTimeTillCanceled) throws Exception {
        try {
            final JobGraph jobGraph = this.getJobGraph(plan);
            final Thread currentThread = Thread.currentThread();
            final AtomicReference error = new AtomicReference();
            boolean jobSuccessfullyCancelled = false;
            new Thread(){

                @Override
                public void run() {
                    try {
                        Thread.sleep(msecsTillCanceling);
                        CancellingTestBase.this.executor.getLeaderGateway(TestingUtils.TESTING_DURATION()).tell((Object)new JobManagerMessages.CancelJob(jobGraph.getJobID()));
                    }
                    catch (Throwable t) {
                        error.set(t);
                        currentThread.interrupt();
                    }
                }
            }.run();
            try {
                this.executor.submitJobAndWait(jobGraph, false);
            }
            catch (JobCancellationException exception) {
                jobSuccessfullyCancelled = true;
            }
            catch (Exception e) {
                throw new IllegalStateException("Job failed.", e);
            }
            if (!jobSuccessfullyCancelled) {
                throw new IllegalStateException("Job was not successfully cancelled.");
            }
        }
        catch (Exception e) {
            LOG.error("Exception found in runAndCancelJob.", (Throwable)e);
            Assert.fail((String)StringUtils.stringifyException((Throwable)e));
        }
    }

    private JobGraph getJobGraph(Plan plan) throws Exception {
        Optimizer pc = new Optimizer(new DataStatistics(), this.executor.configuration());
        OptimizedPlan op = pc.compile(plan);
        JobGraphGenerator jgg = new JobGraphGenerator();
        return jgg.compileJobGraph(op);
    }

    public void setTaskManagerNumSlots(int taskManagerNumSlots) {
        this.taskManagerNumSlots = taskManagerNumSlots;
    }

    public int getTaskManagerNumSlots() {
        return this.taskManagerNumSlots;
    }
}

