/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class StackTraceSampleCoordinatorITCase
extends TestLogger {
    private static ActorSystem testActorSystem;

    @BeforeClass
    public static void setup() {
        testActorSystem = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem((ActorSystem)testActorSystem);
    }

    @Test
    public void testTaskClearedWhileSampling() throws Exception {
        new JavaTestKit(testActorSystem){
            {
                final FiniteDuration deadline = new FiniteDuration(60L, TimeUnit.SECONDS);
                final JobGraph jobGraph = new JobGraph(new JobVertex[0]);
                boolean parallelism = true;
                final JobVertex task = new JobVertex("Task");
                task.setInvokableClass(Tasks.BlockingNoOpInvokable.class);
                task.setParallelism(1);
                jobGraph.addVertex(task);
                ActorGateway jobManger = null;
                ActorGateway taskManager = null;
                try {
                    jobManger = TestingUtils.createJobManager((ActorSystem)testActorSystem, (Configuration)new Configuration());
                    Configuration config = new Configuration();
                    config.setInteger("taskmanager.numberOfTaskSlots", 1);
                    taskManager = TestingUtils.createTaskManager((ActorSystem)testActorSystem, (ActorGateway)jobManger, (Configuration)config, (boolean)true, (boolean)true);
                    final ActorGateway jm = jobManger;
                    new JavaTestKit.Within(deadline){

                        /*
                         * WARNING - Removed try catching itself - possible behaviour change.
                         */
                        protected void run() {
                            try {
                                AkkaActorGateway testActor = new AkkaActorGateway(this.getTestActor(), null);
                                int maxAttempts = 10;
                                int sleepTime = 100;
                                int i = 0;
                                while (i < maxAttempts) {
                                    JobClient.submitJobDetached((ActorGateway)jm, (JobGraph)jobGraph, (FiniteDuration)deadline, (ClassLoader)ClassLoader.getSystemClassLoader());
                                    jm.tell((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), (ActorGateway)testActor);
                                    this.expectMsgEquals(new TestingJobManagerMessages.AllVerticesRunning(jobGraph.getJobID()));
                                    jm.tell((Object)new TestingJobManagerMessages.RequestExecutionGraph(jobGraph.getJobID()), (ActorGateway)testActor);
                                    TestingJobManagerMessages.ExecutionGraphFound executionGraphResponse = (TestingJobManagerMessages.ExecutionGraphFound)this.expectMsgClass(TestingJobManagerMessages.ExecutionGraphFound.class);
                                    ExecutionGraph executionGraph = executionGraphResponse.executionGraph();
                                    ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
                                    StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(testActorSystem, 60000);
                                    Future sampleFuture = coordinator.triggerStackTraceSample(vertex.getTaskVertices(), Integer.MAX_VALUE, new FiniteDuration(10L, TimeUnit.MILLISECONDS), 0);
                                    Thread.sleep(sleepTime);
                                    Future removeFuture = jm.ask((Object)new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), this.remaining());
                                    jm.tell((Object)new JobManagerMessages.CancelJob(jobGraph.getJobID()));
                                    try {
                                        Await.result((Awaitable)sampleFuture, (Duration)this.remaining());
                                        break;
                                    }
                                    catch (Throwable throwable) {
                                    }
                                    finally {
                                        Await.ready((Awaitable)removeFuture, (Duration)this.remaining());
                                    }
                                    ++i;
                                    sleepTime *= 2;
                                }
                            }
                            catch (Exception e) {
                                e.printStackTrace();
                                Assert.fail((String)e.getMessage());
                            }
                        }
                    };
                }
                catch (Throwable throwable) {
                    TestingUtils.stopActor(jobManger);
                    TestingUtils.stopActor(taskManager);
                    throw throwable;
                }
                TestingUtils.stopActor((ActorGateway)jobManger);
                TestingUtils.stopActor((ActorGateway)taskManager);
            }
        };
    }
}

