package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.class */
public class ExecutionGraphMetricsTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest$TestingRestartStrategy.class */
    static class TestingRestartStrategy implements RestartStrategy {
        private boolean restartable = true;
        private ExecutionGraph executionGraph = null;

        TestingRestartStrategy() {
        }

        public boolean canRestart() {
            return this.restartable;
        }

        public void restart(ExecutionGraph executionGraph) {
            this.executionGraph = executionGraph;
        }

        public void setRestartable(boolean z) {
            this.restartable = z;
        }

        public void restartExecutionGraph() {
            this.executionGraph.restart();
        }
    }

    @Test
    public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException {
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        try {
            JobVertex jobVertex = new JobVertex("TestVertex");
            jobVertex.setParallelism(1);
            jobVertex.setInvokableClass(NoOpInvokable.class);
            JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{jobVertex});
            Configuration configuration = new Configuration();
            Time seconds = Time.seconds(10L);
            Scheduler scheduler = (Scheduler) Mockito.mock(Scheduler.class);
            ResourceID generate = ResourceID.generate();
            TaskManagerLocation taskManagerLocation = (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class);
            Mockito.when(taskManagerLocation.getResourceID()).thenReturn(generate);
            Mockito.when(taskManagerLocation.getHostname()).thenReturn("localhost");
            TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
            Instance instance = (Instance) Mockito.mock(Instance.class);
            Mockito.when(instance.getTaskManagerLocation()).thenReturn(taskManagerLocation);
            Mockito.when(instance.getTaskManagerID()).thenReturn(generate);
            Mockito.when(instance.getTaskManagerGateway()).thenReturn(taskManagerGateway);
            Slot slot = (Slot) Mockito.mock(Slot.class);
            AllocatedSlot allocatedSlot = (AllocatedSlot) Mockito.mock(AllocatedSlot.class);
            Mockito.when(allocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
            SimpleSlot simpleSlot = (SimpleSlot) Mockito.mock(SimpleSlot.class);
            Mockito.when(Boolean.valueOf(simpleSlot.isAlive())).thenReturn(true);
            Mockito.when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
            Mockito.when(simpleSlot.getTaskManagerID()).thenReturn(generate);
            Mockito.when(simpleSlot.getTaskManagerGateway()).thenReturn(taskManagerGateway);
            Mockito.when(Boolean.valueOf(simpleSlot.setExecutedVertex((Execution) Matchers.any(Execution.class)))).thenReturn(true);
            Mockito.when(simpleSlot.getRoot()).thenReturn(slot);
            Mockito.when(simpleSlot.getAllocatedSlot()).thenReturn(allocatedSlot);
            FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
            flinkCompletableFuture.complete(simpleSlot);
            Mockito.when(scheduler.allocateSlot((ScheduledUnit) Mockito.any(ScheduledUnit.class), Mockito.anyBoolean())).thenReturn(flinkCompletableFuture);
            Mockito.when(Integer.valueOf(slot.getSlotNumber())).thenReturn(0);
            Mockito.when(taskManagerGateway.submitTask((TaskDeploymentDescriptor) Mockito.any(TaskDeploymentDescriptor.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Acknowledge.get()));
            TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
            ExecutionGraph executionGraph = new ExecutionGraph(newSingleThreadScheduledExecutor, newSingleThreadScheduledExecutor, jobGraph.getJobID(), jobGraph.getName(), configuration, new SerializedValue((Object) null), seconds, testingRestartStrategy, scheduler);
            RestartTimeGauge restartTimeGauge = new RestartTimeGauge(executionGraph);
            Assert.assertEquals(0L, restartTimeGauge.getValue().longValue());
            executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
            executionGraph.scheduleForExecution();
            Assert.assertEquals(0L, restartTimeGauge.getValue().longValue());
            ArrayList arrayList = new ArrayList();
            Iterator it = executionGraph.getAllExecutionVertices().iterator();
            while (it.hasNext()) {
                arrayList.add(((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getAttemptId());
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), (ExecutionAttemptID) it2.next(), ExecutionState.RUNNING));
            }
            Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
            Assert.assertEquals(0L, restartTimeGauge.getValue().longValue());
            Iterator it3 = arrayList.iterator();
            while (it3.hasNext()) {
                executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), (ExecutionAttemptID) it3.next(), ExecutionState.FAILED, new Exception()));
            }
            Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
            long statusTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
            Thread.sleep(50L);
            long longValue = restartTimeGauge.getValue().longValue();
            for (int i = 0; i < 10; i++) {
                long longValue2 = restartTimeGauge.getValue().longValue();
                Assert.assertTrue(longValue2 >= longValue);
                longValue = longValue2;
            }
            Assert.assertTrue(longValue > 0);
            testingRestartStrategy.restartExecutionGraph();
            arrayList.clear();
            Iterator it4 = executionGraph.getAllExecutionVertices().iterator();
            while (it4.hasNext()) {
                arrayList.add(((ExecutionVertex) it4.next()).getCurrentExecutionAttempt().getAttemptId());
            }
            Iterator it5 = arrayList.iterator();
            while (it5.hasNext()) {
                executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), (ExecutionAttemptID) it5.next(), ExecutionState.RUNNING));
            }
            Assert.assertEquals(JobStatus.RUNNING, executionGraph.getState());
            Assert.assertTrue(statusTimestamp != 0);
            long longValue3 = restartTimeGauge.getValue().longValue();
            for (int i2 = 0; i2 < 10; i2++) {
                long longValue4 = restartTimeGauge.getValue().longValue();
                Assert.assertTrue(longValue4 == longValue3);
                longValue3 = longValue4;
            }
            Iterator it6 = arrayList.iterator();
            while (it6.hasNext()) {
                executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), (ExecutionAttemptID) it6.next(), ExecutionState.FAILED, new Exception()));
            }
            Assert.assertEquals(JobStatus.RESTARTING, executionGraph.getState());
            Assert.assertTrue(statusTimestamp != executionGraph.getStatusTimestamp(JobStatus.RESTARTING));
            Thread.sleep(50L);
            long longValue5 = restartTimeGauge.getValue().longValue();
            for (int i3 = 0; i3 < 10; i3++) {
                long longValue6 = restartTimeGauge.getValue().longValue();
                Assert.assertTrue(longValue6 >= longValue5);
                longValue5 = longValue6;
            }
            Assert.assertTrue(longValue5 > 0);
            executionGraph.failGlobal(new SuppressRestartsException(new Exception()));
            Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
            long longValue7 = restartTimeGauge.getValue().longValue();
            for (int i4 = 0; i4 < 10; i4++) {
                long longValue8 = restartTimeGauge.getValue().longValue();
                Assert.assertTrue(longValue8 == longValue7);
                longValue7 = longValue8;
            }
        } finally {
            newSingleThreadScheduledExecutor.shutdownNow();
        }
    }
}
