package org.apache.flink.runtime.webmonitor;

import akka.actor.ActorSystem;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.StackTraceSampleMessages;
import org.apache.flink.runtime.messages.StackTraceSampleResponse;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinatorTest.class */
public class StackTraceSampleCoordinatorTest {
    private static ActorSystem system;
    private StackTraceSampleCoordinator coord;

    @BeforeClass
    public static void setUp() throws Exception {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (system != null) {
            system.shutdown();
        }
    }

    @Before
    public void init() throws Exception {
        this.coord = new StackTraceSampleCoordinator(system.dispatcher(), 60000L);
    }

    @Test
    public void testTriggerStackTraceSample() throws Exception {
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        Time milliseconds = Time.milliseconds(100L);
        Future triggerStackTraceSample = this.coord.triggerStackTraceSample(executionVertexArr, 1, milliseconds, 0);
        for (ExecutionVertex executionVertex : executionVertexArr) {
            new StackTraceSampleMessages.TriggerStackTraceSample(0, executionVertex.getCurrentExecutionAttempt().getAttemptId(), 1, milliseconds, 0);
            ((Execution) Mockito.verify(executionVertex.getCurrentExecutionAttempt())).requestStackTraceSample(Matchers.eq(0), Matchers.eq(1), (Time) Matchers.eq(milliseconds), Matchers.eq(0), (Time) Matchers.any(Time.class));
        }
        Assert.assertFalse(triggerStackTraceSample.isDone());
        StackTraceElement[] stackTrace = Thread.currentThread().getStackTrace();
        ArrayList arrayList = new ArrayList();
        arrayList.add(stackTrace);
        arrayList.add(stackTrace);
        arrayList.add(stackTrace);
        for (int i = 0; i < executionVertexArr.length; i++) {
            this.coord.collectStackTraces(0, executionVertexArr[i].getCurrentExecutionAttempt().getAttemptId(), arrayList);
            if (i == executionVertexArr.length - 1) {
                Assert.assertTrue(triggerStackTraceSample.isDone());
            } else {
                Assert.assertFalse(triggerStackTraceSample.isDone());
            }
        }
        StackTraceSample stackTraceSample = (StackTraceSample) triggerStackTraceSample.get();
        Assert.assertEquals(0L, stackTraceSample.getSampleId());
        Assert.assertTrue(stackTraceSample.getEndTime() >= stackTraceSample.getStartTime());
        Map stackTraces = stackTraceSample.getStackTraces();
        for (ExecutionVertex executionVertex2 : executionVertexArr) {
            List list = (List) stackTraces.get(executionVertex2.getCurrentExecutionAttempt().getAttemptId());
            Assert.assertNotNull("Task not found", list);
            Assert.assertTrue(arrayList.equals(list));
        }
        Assert.assertEquals(0L, this.coord.getNumberOfPendingSamples());
        this.coord.collectStackTraces(0, executionVertexArr[0].getCurrentExecutionAttempt().getAttemptId(), arrayList);
    }

    @Test
    public void testTriggerStackTraceSampleNotRunningTasks() throws Exception {
        Future triggerStackTraceSample = this.coord.triggerStackTraceSample(new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.DEPLOYING, true)}, 1, Time.milliseconds(100L), 0);
        Assert.assertTrue(triggerStackTraceSample.isDone());
        try {
            triggerStackTraceSample.get();
            Assert.fail("Expected exception.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof IllegalStateException);
        }
    }

    @Test(timeout = 1000)
    public void testTriggerStackTraceSampleResetRunningTasks() throws Exception {
        try {
            this.coord.triggerStackTraceSample(new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true), mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, false)}, 1, Time.milliseconds(100L), 0).get();
            Assert.fail("Expected exception.");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof RuntimeException);
        }
    }

    @Test(timeout = 1000)
    public void testTriggerStackTraceSampleTimeout() throws Exception {
        this.coord = new StackTraceSampleCoordinator(system.dispatcher(), 100);
        ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(1);
        try {
            ExecutionVertex[] executionVertexArr = {mockExecutionVertexWithTimeout(new ExecutionAttemptID(), ExecutionState.RUNNING, scheduledThreadPoolExecutor, 100)};
            Future triggerStackTraceSample = this.coord.triggerStackTraceSample(executionVertexArr, 1, Time.milliseconds(100L), 0);
            Thread.sleep(100 * 2);
            boolean z = false;
            int i = 0;
            while (true) {
                if (i >= 10) {
                    break;
                }
                if (triggerStackTraceSample.isDone()) {
                    z = true;
                    break;
                } else {
                    Thread.sleep(100);
                    i++;
                }
            }
            Assert.assertTrue("Sample did not time out", z);
            try {
                triggerStackTraceSample.get();
                Assert.fail("Expected exception.");
            } catch (ExecutionException e) {
                Assert.assertTrue(e.getCause().getCause().getMessage().contains("Timeout"));
            }
            this.coord.collectStackTraces(0, executionVertexArr[0].getCurrentExecutionAttempt().getAttemptId(), new ArrayList());
            scheduledThreadPoolExecutor.shutdownNow();
        } catch (Throwable th) {
            scheduledThreadPoolExecutor.shutdownNow();
            throw th;
        }
    }

    @Test
    public void testCollectStackTraceForUnknownSample() throws Exception {
        this.coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList());
    }

    @Test
    public void testCancelStackTraceSample() throws Exception {
        Future triggerStackTraceSample = this.coord.triggerStackTraceSample(new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)}, 1, Time.milliseconds(100L), 0);
        Assert.assertFalse(triggerStackTraceSample.isDone());
        this.coord.cancelStackTraceSample(0, (Throwable) null);
        Assert.assertTrue(triggerStackTraceSample.isDone());
        Assert.assertEquals(0L, this.coord.getNumberOfPendingSamples());
    }

    @Test
    public void testCollectStackTraceForCanceledSample() throws Exception {
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        Future triggerStackTraceSample = this.coord.triggerStackTraceSample(executionVertexArr, 1, Time.milliseconds(100L), 0);
        Assert.assertFalse(triggerStackTraceSample.isDone());
        this.coord.cancelStackTraceSample(0, (Throwable) null);
        Assert.assertTrue(triggerStackTraceSample.isDone());
        this.coord.collectStackTraces(0, executionVertexArr[0].getCurrentExecutionAttempt().getAttemptId(), new ArrayList());
    }

    @Test
    public void testCollectForDiscardedPendingSample() throws Exception {
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        Future triggerStackTraceSample = this.coord.triggerStackTraceSample(executionVertexArr, 1, Time.milliseconds(100L), 0);
        Assert.assertFalse(triggerStackTraceSample.isDone());
        this.coord.cancelStackTraceSample(0, (Throwable) null);
        Assert.assertTrue(triggerStackTraceSample.isDone());
        this.coord.collectStackTraces(0, executionVertexArr[0].getCurrentExecutionAttempt().getAttemptId(), new ArrayList());
    }

    @Test(expected = IllegalArgumentException.class)
    public void testCollectStackTraceForUnknownTask() throws Exception {
        this.coord.triggerStackTraceSample(new ExecutionVertex[]{mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)}, 1, Time.milliseconds(100L), 0);
        this.coord.collectStackTraces(0, new ExecutionAttemptID(), new ArrayList());
    }

    @Test
    public void testShutDown() throws Exception {
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(new ExecutionAttemptID(), ExecutionState.RUNNING, true)};
        ArrayList arrayList = new ArrayList();
        arrayList.add(this.coord.triggerStackTraceSample(executionVertexArr, 1, Time.milliseconds(100L), 0));
        arrayList.add(this.coord.triggerStackTraceSample(executionVertexArr, 1, Time.milliseconds(100L), 0));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((Future) it.next()).isDone());
        }
        this.coord.shutDown();
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            Assert.assertTrue(((Future) it2.next()).isDone());
        }
        Future triggerStackTraceSample = this.coord.triggerStackTraceSample(executionVertexArr, 1, Time.milliseconds(100L), 0);
        Assert.assertTrue(triggerStackTraceSample.isDone());
        try {
            triggerStackTraceSample.get();
            Assert.fail("Expected exception.");
        } catch (ExecutionException e) {
        }
    }

    private ExecutionVertex mockExecutionVertex(ExecutionAttemptID executionAttemptID, ExecutionState executionState, boolean z) {
        Execution execution = (Execution) Mockito.mock(Execution.class);
        Mockito.when(execution.getAttemptId()).thenReturn(executionAttemptID);
        Mockito.when(execution.getState()).thenReturn(executionState);
        Mockito.when(execution.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), (Time) Matchers.any(Time.class), Matchers.anyInt(), (Time) Matchers.any(Time.class))).thenReturn(z ? FlinkCompletableFuture.completed(Mockito.mock(StackTraceSampleResponse.class)) : FlinkCompletableFuture.completedExceptionally(new Exception("Send failed")));
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        Mockito.when(executionVertex.getJobvertexId()).thenReturn(new JobVertexID());
        Mockito.when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
        return executionVertex;
    }

    private ExecutionVertex mockExecutionVertexWithTimeout(ExecutionAttemptID executionAttemptID, ExecutionState executionState, ScheduledExecutorService scheduledExecutorService, int i) {
        final FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
        Execution execution = (Execution) Mockito.mock(Execution.class);
        Mockito.when(execution.getAttemptId()).thenReturn(executionAttemptID);
        Mockito.when(execution.getState()).thenReturn(executionState);
        Mockito.when(execution.requestStackTraceSample(Matchers.anyInt(), Matchers.anyInt(), (Time) Matchers.any(Time.class), Matchers.anyInt(), (Time) Matchers.any(Time.class))).thenReturn(flinkCompletableFuture);
        scheduledExecutorService.schedule(new Runnable() { // from class: org.apache.flink.runtime.webmonitor.StackTraceSampleCoordinatorTest.1
            @Override // java.lang.Runnable
            public void run() {
                flinkCompletableFuture.completeExceptionally(new TimeoutException("Timeout"));
            }
        }, i, TimeUnit.MILLISECONDS);
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        Mockito.when(executionVertex.getJobvertexId()).thenReturn(new JobVertexID());
        Mockito.when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
        return executionVertex;
    }
}
