/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.taskmanager;

import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.api.reader.RecordReader;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.JobManagerActorTestUtils;
import org.apache.flink.types.IntValue;
import org.apache.flink.util.Preconditions;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class TaskCancelTest {
    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testCancelUnion() throws Exception {
        int numberOfSources = 8;
        int sourceParallelism = 4;
        TestingCluster flink = null;
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 2);
            config.setInteger("taskmanager.numberOfTaskSlots", sourceParallelism);
            config.setString("akka.ask.timeout", TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
            config.setInteger("taskmanager.memory.segment-size", 4096);
            config.setInteger("taskmanager.network.numberOfBuffers", 2048);
            flink = new TestingCluster(config, false);
            flink.start();
            JobGraph jobGraph = new JobGraph("Cancel Big Union");
            JobVertex[] sources = new JobVertex[numberOfSources];
            SlotSharingGroup group = new SlotSharingGroup();
            for (int i = 0; i < sources.length; ++i) {
                sources[i] = new JobVertex("Source " + i);
                sources[i].setInvokableClass(InfiniteSource.class);
                sources[i].setParallelism(sourceParallelism);
                sources[i].setSlotSharingGroup(group);
                jobGraph.addVertex(sources[i]);
                group.addVertexToGroup(sources[i].getID());
            }
            JobVertex union = new JobVertex("Union");
            union.setInvokableClass(AgnosticUnion.class);
            union.setParallelism(sourceParallelism);
            jobGraph.addVertex(union);
            for (JobVertex source : sources) {
                union.connectNewDataSetAsInput(source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
            }
            flink.submitJobDetached(jobGraph);
            TaskCancelTest.awaitRunning(flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), jobGraph.getJobID(), TestingUtils.TESTING_DURATION());
            Thread.sleep(5000L);
            TaskCancelTest.cancelJob(flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), jobGraph.getJobID(), TestingUtils.TESTING_DURATION());
            JobManagerActorTestUtils.waitForJobStatus(jobGraph.getJobID(), JobStatus.CANCELED, flink.getLeaderGateway(TestingUtils.TESTING_DURATION()), TestingUtils.TESTING_DURATION());
        }
        finally {
            if (flink != null) {
                flink.shutdown();
            }
        }
    }

    public static void cancelJob(ActorGateway jobManager, JobID jobId, FiniteDuration timeout) throws Exception {
        Preconditions.checkNotNull((Object)jobManager);
        Preconditions.checkNotNull((Object)jobId);
        Preconditions.checkNotNull((Object)timeout);
        Future ask = jobManager.ask((Object)new JobManagerMessages.CancelJob(jobId), timeout);
        Object result = Await.result((Awaitable)ask, (Duration)timeout);
        if (result instanceof JobManagerMessages.CancellationSuccess) {
            JobManagerMessages.CancellationSuccess success = (JobManagerMessages.CancellationSuccess)result;
            if (!success.jobID().equals((Object)jobId)) {
                throw new Exception("JobManager responded for wrong job ID. Request: " + jobId + ", response: " + success.jobID() + ".");
            }
        } else {
            if (result instanceof JobManagerMessages.CancellationFailure) {
                JobManagerMessages.CancellationFailure failure = (JobManagerMessages.CancellationFailure)result;
                throw new Exception("Failed to cancel job with ID " + failure.jobID() + ".", failure.cause());
            }
            throw new Exception("Unexpected response to cancel request: " + result);
        }
    }

    public static void awaitRunning(ActorGateway jobManager, JobID jobId, FiniteDuration timeout) throws Exception {
        Future ask;
        Object result;
        Preconditions.checkNotNull((Object)jobManager);
        Preconditions.checkNotNull((Object)jobId);
        Preconditions.checkNotNull((Object)timeout);
        while ((result = Await.result((Awaitable)(ask = jobManager.ask((Object)new JobManagerMessages.RequestJobStatus(jobId), timeout)), (Duration)timeout)) instanceof JobManagerMessages.CurrentJobStatus) {
            JobManagerMessages.CurrentJobStatus status = (JobManagerMessages.CurrentJobStatus)result;
            if (!status.jobID().equals((Object)jobId)) {
                throw new Exception("JobManager responded for wrong job ID. Request: " + jobId + ", response: " + status.jobID() + ".");
            }
            if (status.status() == JobStatus.RUNNING) {
                return;
            }
            if (!status.status().isGloballyTerminalState()) continue;
            throw new Exception("JobStatus changed to " + status.status() + " while waiting for job to start running.");
        }
        if (result instanceof JobManagerMessages.JobNotFound) {
            throw new Exception("Cannot find job with ID " + jobId + ".");
        }
        throw new Exception("Unexpected response to cancel request: " + result);
    }

    public static class AgnosticUnion
    extends AbstractInvokable {
        public void invoke() throws Exception {
            UnionInputGate union = new UnionInputGate(this.getEnvironment().getAllInputGates());
            RecordReader reader = new RecordReader((InputGate)union, IntValue.class, this.getEnvironment().getTaskManagerInfo().getTmpDirectories());
            while (reader.next() != null) {
            }
        }
    }

    public static class InfiniteSource
    extends AbstractInvokable {
        public void invoke() throws Exception {
            RecordWriter writer = new RecordWriter(this.getEnvironment().getWriter(0));
            IntValue val = new IntValue();
            try {
                int i = 0;
                while (true) {
                    if (Thread.interrupted()) {
                        return;
                    }
                    val.setValue(i);
                    writer.emit((IOReadableWritable)val);
                    ++i;
                }
            }
            finally {
                writer.clearBuffers();
            }
        }
    }
}

