/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceConnectionInfo;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.Slot;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.Messages;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
import scala.concurrent.Future$;
import scala.concurrent.duration.FiniteDuration;

public class ExecutionGraphMetricsTest
extends TestLogger {
    @Test
    public void testExecutionGraphRestartTimeMetric() throws JobException, IOException, InterruptedException {
        long currentRestartingTime;
        int i;
        long currentRestartingTime2;
        int parallelism = 1;
        JobVertex jobVertex = new JobVertex("TestVertex");
        jobVertex.setParallelism(parallelism);
        jobVertex.setInvokableClass(Tasks.NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph("Test Job", new JobVertex[]{jobVertex});
        Configuration config = new Configuration();
        config.setString("metrics.reporters", "test");
        config.setString("metrics.reporter.test.class", TestingReporter.class.getName());
        Configuration jobConfig = new Configuration();
        FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
        MetricRegistry metricRegistry = new MetricRegistry(config);
        Assert.assertTrue((metricRegistry.getReporters().size() == 1 ? 1 : 0) != 0);
        MetricReporter reporter = (MetricReporter)metricRegistry.getReporters().get(0);
        Assert.assertTrue((boolean)(reporter instanceof TestingReporter));
        TestingReporter testingReporter = (TestingReporter)reporter;
        JobManagerMetricGroup metricGroup = new JobManagerMetricGroup(metricRegistry, "localhost");
        Scheduler scheduler = (Scheduler)Mockito.mock(Scheduler.class);
        SimpleSlot simpleSlot = (SimpleSlot)Mockito.mock(SimpleSlot.class);
        Instance instance = (Instance)Mockito.mock(Instance.class);
        InstanceConnectionInfo instanceConnectionInfo = (InstanceConnectionInfo)Mockito.mock(InstanceConnectionInfo.class);
        Slot rootSlot = (Slot)Mockito.mock(Slot.class);
        ActorGateway actorGateway = (ActorGateway)Mockito.mock(ActorGateway.class);
        Mockito.when((Object)simpleSlot.isAlive()).thenReturn((Object)true);
        Mockito.when((Object)simpleSlot.getInstance()).thenReturn((Object)instance);
        Mockito.when((Object)simpleSlot.setExecutedVertex((Execution)Matchers.any(Execution.class))).thenReturn((Object)true);
        Mockito.when((Object)simpleSlot.getRoot()).thenReturn((Object)rootSlot);
        Mockito.when((Object)scheduler.scheduleImmediately((ScheduledUnit)Matchers.any(ScheduledUnit.class))).thenReturn((Object)simpleSlot);
        Mockito.when((Object)instance.getInstanceConnectionInfo()).thenReturn((Object)instanceConnectionInfo);
        Mockito.when((Object)instance.getActorGateway()).thenReturn((Object)actorGateway);
        Mockito.when((Object)instanceConnectionInfo.getHostname()).thenReturn((Object)"localhost");
        Mockito.when((Object)rootSlot.getSlotNumber()).thenReturn((Object)0);
        Mockito.when((Object)actorGateway.ask(Matchers.any(Object.class), (FiniteDuration)Matchers.any(FiniteDuration.class))).thenReturn((Object)Future$.MODULE$.successful((Object)Messages.getAcknowledge()));
        TestingRestartStrategy testingRestartStrategy = new TestingRestartStrategy();
        ExecutionGraph executionGraph = new ExecutionGraph((ExecutionContext)ExecutionContext$.MODULE$.fromExecutor((Executor)new ForkJoinPool()), jobGraph.getJobID(), jobGraph.getName(), jobConfig, new SerializedValue(null), timeout, (RestartStrategy)testingRestartStrategy, Collections.emptyList(), Collections.emptyList(), ((Object)((Object)this)).getClass().getClassLoader(), (MetricGroup)metricGroup);
        Metric metric = testingReporter.getMetric("restartingTime");
        Assert.assertNotNull((Object)metric);
        Assert.assertTrue((boolean)(metric instanceof Gauge));
        Gauge restartingTime = (Gauge)metric;
        Assert.assertTrue((0L == (Long)restartingTime.getValue() ? 1 : 0) != 0);
        executionGraph.attachJobGraph(jobGraph.getVerticesSortedTopologicallyFromSources());
        executionGraph.scheduleForExecution(scheduler);
        Assert.assertTrue((0L == (Long)restartingTime.getValue() ? 1 : 0) != 0);
        ArrayList<ExecutionAttemptID> executionIDs = new ArrayList<ExecutionAttemptID>();
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        for (ExecutionAttemptID executionID : executionIDs) {
            executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
        }
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        Assert.assertTrue((0L == (Long)restartingTime.getValue() ? 1 : 0) != 0);
        for (ExecutionAttemptID executionID : executionIDs) {
            executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, (Throwable)new Exception()));
        }
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        long firstRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
        Thread.sleep(50L);
        long previousRestartingTime = (Long)restartingTime.getValue();
        for (int i2 = 0; i2 < 10; ++i2) {
            currentRestartingTime2 = (Long)restartingTime.getValue();
            Assert.assertTrue((currentRestartingTime2 >= previousRestartingTime ? 1 : 0) != 0);
            previousRestartingTime = currentRestartingTime2;
        }
        Assert.assertTrue((previousRestartingTime > 0L ? 1 : 0) != 0);
        testingRestartStrategy.restartExecutionGraph();
        executionIDs.clear();
        for (ExecutionVertex executionVertex : executionGraph.getAllExecutionVertices()) {
            executionIDs.add(executionVertex.getCurrentExecutionAttempt().getAttemptId());
        }
        for (ExecutionAttemptID executionID : executionIDs) {
            executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.RUNNING));
        }
        Assert.assertEquals((Object)JobStatus.RUNNING, (Object)executionGraph.getState());
        Assert.assertTrue((firstRestartingTimestamp != 0L ? 1 : 0) != 0);
        previousRestartingTime = (Long)restartingTime.getValue();
        for (int i3 = 0; i3 < 10; ++i3) {
            currentRestartingTime2 = (Long)restartingTime.getValue();
            Assert.assertTrue((currentRestartingTime2 == previousRestartingTime ? 1 : 0) != 0);
            previousRestartingTime = currentRestartingTime2;
        }
        for (ExecutionAttemptID executionID : executionIDs) {
            executionGraph.updateState(new TaskExecutionState(jobGraph.getJobID(), executionID, ExecutionState.FAILED, (Throwable)new Exception()));
        }
        Assert.assertEquals((Object)JobStatus.RESTARTING, (Object)executionGraph.getState());
        long secondRestartingTimestamp = executionGraph.getStatusTimestamp(JobStatus.RESTARTING);
        Assert.assertTrue((firstRestartingTimestamp != secondRestartingTimestamp ? 1 : 0) != 0);
        Thread.sleep(50L);
        previousRestartingTime = (Long)restartingTime.getValue();
        for (i = 0; i < 10; ++i) {
            currentRestartingTime = (Long)restartingTime.getValue();
            Assert.assertTrue((currentRestartingTime >= previousRestartingTime ? 1 : 0) != 0);
            previousRestartingTime = currentRestartingTime;
        }
        Assert.assertTrue((previousRestartingTime > 0L ? 1 : 0) != 0);
        executionGraph.fail((Throwable)new Exception());
        Assert.assertEquals((Object)JobStatus.FAILED, (Object)executionGraph.getState());
        previousRestartingTime = (Long)restartingTime.getValue();
        for (i = 0; i < 10; ++i) {
            currentRestartingTime = (Long)restartingTime.getValue();
            Assert.assertTrue((currentRestartingTime == previousRestartingTime ? 1 : 0) != 0);
            previousRestartingTime = currentRestartingTime;
        }
    }

    static class TestingRestartStrategy
    implements RestartStrategy {
        private boolean restartable = true;
        private ExecutionGraph executionGraph = null;

        TestingRestartStrategy() {
        }

        public boolean canRestart() {
            return this.restartable;
        }

        public void restart(ExecutionGraph executionGraph) {
            this.executionGraph = executionGraph;
        }

        public void setRestartable(boolean restartable) {
            this.restartable = restartable;
        }

        public void restartExecutionGraph() {
            this.executionGraph.restart();
        }
    }

    public static class TestingReporter
    implements MetricReporter {
        private final Map<String, Metric> metrics = new HashMap<String, Metric>();

        public void open(MetricConfig config) {
        }

        public void close() {
        }

        public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
            this.metrics.put(metricName, metric);
        }

        public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
            this.metrics.remove(metricName);
        }

        Metric getMetric(String metricName) {
            return this.metrics.get(metricName);
        }
    }
}

