/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.StackTraceSampleMessages;
import org.apache.flink.runtime.webmonitor.StackTraceSample;
import org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinator;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import scala.util.Try;

public class StackTraceSampleCoordinatorTest {
    private static ActorSystem system;
    private StackTraceSampleCoordinator coord;

    @BeforeClass
    public static void setUp() throws Exception {
        system = AkkaUtils.createLocalActorSystem((Configuration)new Configuration());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (system != null) {
            system.shutdown();
        }
    }

    @Before
    public void init() throws Exception {
        this.coord = new StackTraceSampleCoordinator(system, 60000);
    }

    @Test
    public void testTriggerStackTraceSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        int numSamples = 1;
        FiniteDuration delayBetweenSamples = new FiniteDuration(100L, TimeUnit.MILLISECONDS);
        int maxStackTraceDepth = 0;
        Future sampleFuture = this.coord.triggerStackTraceSample(vertices, numSamples, delayBetweenSamples, maxStackTraceDepth);
        for (ExecutionVertex vertex : vertices) {
            ExecutionAttemptID expectedExecutionId = vertex.getCurrentExecutionAttempt().getAttemptId();
            StackTraceSampleMessages.TriggerStackTraceSample expectedMsg = new StackTraceSampleMessages.TriggerStackTraceSample(0, expectedExecutionId, numSamples, delayBetweenSamples, maxStackTraceDepth);
            ((ExecutionVertex)Mockito.verify((Object)vertex)).sendMessageToCurrentExecution((Serializable)Matchers.eq((Object)expectedMsg), (ExecutionAttemptID)Matchers.eq((Object)expectedExecutionId), (ActorGateway)Matchers.any(AkkaActorGateway.class));
        }
        Assert.assertFalse((boolean)sampleFuture.isCompleted());
        StackTraceElement[] stackTraceSample = Thread.currentThread().getStackTrace();
        ArrayList<StackTraceElement[]> traces = new ArrayList<StackTraceElement[]>();
        traces.add(stackTraceSample);
        traces.add(stackTraceSample);
        traces.add(stackTraceSample);
        for (int i = 0; i < vertices.length; ++i) {
            ExecutionAttemptID executionId = vertices[i].getCurrentExecutionAttempt().getAttemptId();
            this.coord.collectStackTraces(0, executionId, traces);
            if (i == vertices.length - 1) {
                Assert.assertTrue((boolean)sampleFuture.isCompleted());
                continue;
            }
            Assert.assertFalse((boolean)sampleFuture.isCompleted());
        }
        StackTraceSample sample = (StackTraceSample)((Try)sampleFuture.value().get()).get();
        Assert.assertEquals((long)0L, (long)sample.getSampleId());
        Assert.assertTrue((sample.getEndTime() >= sample.getStartTime() ? 1 : 0) != 0);
        Map tracesByTask = sample.getStackTraces();
        for (ExecutionVertex vertex : vertices) {
            ExecutionAttemptID executionId = vertex.getCurrentExecutionAttempt().getAttemptId();
            List sampleTraces = (List)tracesByTask.get(executionId);
            Assert.assertNotNull((String)"Task not found", (Object)sampleTraces);
            Assert.assertTrue((boolean)traces.equals(sampleTraces));
        }
        Assert.assertEquals((long)0L, (long)this.coord.getNumberOfPendingSamples());
        this.coord.collectStackTraces(0, vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
    }

    @Test
    public void testTriggerStackTraceSampleNotRunningTasks() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true)};
        Future sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        Assert.assertTrue((boolean)sampleFuture.isCompleted());
        Assert.assertTrue((boolean)sampleFuture.failed().isCompleted());
        Assert.assertTrue((boolean)(((Try)sampleFuture.failed().value().get()).get() instanceof IllegalStateException));
    }

    @Test
    public void testTriggerStackTraceSampleResetRunningTasks() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false)};
        Future sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        Assert.assertTrue((boolean)sampleFuture.isCompleted());
        Assert.assertTrue((boolean)sampleFuture.failed().isCompleted());
        Assert.assertTrue((boolean)(((Throwable)((Try)sampleFuture.failed().value().get()).get()).getCause() instanceof RuntimeException));
    }

    @Test
    public void testTriggerStackTraceSampleTimeout() throws Exception {
        int timeout = 100;
        this.coord = new StackTraceSampleCoordinator(system, timeout);
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        Future sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        Thread.sleep(timeout * 2);
        boolean success = false;
        for (int i = 0; i < 10; ++i) {
            if (sampleFuture.isCompleted()) {
                success = true;
                break;
            }
            Thread.sleep(timeout);
        }
        Assert.assertTrue((String)"Sample did not time out", (boolean)success);
        Throwable cause = (Throwable)((Try)sampleFuture.failed().value().get()).get();
        Assert.assertTrue((boolean)cause.getCause().getMessage().contains("Time out"));
        ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
        this.coord.collectStackTraces(0, executionId, new ArrayList());
    }

    @Test
    public void testCollectStackTraceForUnknownSample() throws Exception {
        this.coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList());
    }

    @Test
    public void testCancelStackTraceSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        Future sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        Assert.assertFalse((boolean)sampleFuture.isCompleted());
        this.coord.cancelStackTraceSample(0, null);
        Assert.assertTrue((boolean)sampleFuture.isCompleted());
        Assert.assertEquals((long)0L, (long)this.coord.getNumberOfPendingSamples());
    }

    @Test
    public void testCollectStackTraceForCanceledSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        Future sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        Assert.assertFalse((boolean)sampleFuture.isCompleted());
        this.coord.cancelStackTraceSample(0, null);
        Assert.assertTrue((boolean)sampleFuture.isCompleted());
        ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
        this.coord.collectStackTraces(0, executionId, new ArrayList());
    }

    @Test
    public void testCollectForDiscardedPendingSample() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        Future sampleFuture = this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        Assert.assertFalse((boolean)sampleFuture.isCompleted());
        this.coord.cancelStackTraceSample(0, null);
        Assert.assertTrue((boolean)sampleFuture.isCompleted());
        ExecutionAttemptID executionId = vertices[0].getCurrentExecutionAttempt().getAttemptId();
        this.coord.collectStackTraces(0, executionId, new ArrayList());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testCollectStackTraceForUnknownTask() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        this.coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList());
    }

    @Test
    public void testShutDown() throws Exception {
        ExecutionVertex[] vertices = new ExecutionVertex[]{this.mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        ArrayList<Future> sampleFutures = new ArrayList<Future>();
        sampleFutures.add(this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0));
        sampleFutures.add(this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0));
        for (Future future : sampleFutures) {
            Assert.assertFalse((boolean)future.isCompleted());
        }
        this.coord.shutDown();
        for (Future future : sampleFutures) {
            Assert.assertTrue((boolean)future.isCompleted());
        }
        Future future = this.coord.triggerStackTraceSample(vertices, 1, new FiniteDuration(100L, TimeUnit.MILLISECONDS), 0);
        Assert.assertTrue((boolean)future.isCompleted());
        Assert.assertTrue((boolean)future.failed().isCompleted());
    }

    private ExecutionVertex mockExecutionVertex(ExecutionAttemptID executionId, ExecutionState state, boolean sendSuccess) {
        Execution exec = (Execution)Mockito.mock(Execution.class);
        Mockito.when((Object)exec.getAttemptId()).thenReturn((Object)executionId);
        Mockito.when((Object)exec.getState()).thenReturn((Object)state);
        ExecutionVertex vertex = (ExecutionVertex)Mockito.mock(ExecutionVertex.class);
        Mockito.when((Object)vertex.getJobvertexId()).thenReturn((Object)new JobVertexID());
        Mockito.when((Object)vertex.getCurrentExecutionAttempt()).thenReturn((Object)exec);
        Mockito.when((Object)vertex.sendMessageToCurrentExecution((Serializable)Matchers.any(Serializable.class), (ExecutionAttemptID)Matchers.any(ExecutionAttemptID.class), (ActorGateway)Matchers.any(AkkaActorGateway.class))).thenReturn((Object)sendSuccess);
        return vertex;
    }
}

