/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.stats;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.flink.api.common.JobID;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.SubtaskState;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.stats.JobCheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.OperatorCheckpointStats;
import org.apache.flink.runtime.checkpoint.stats.SimpleCheckpointStatsTracker;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

public class SimpleCheckpointStatsTrackerTest {
    private static final Random RAND = new Random();

    @Test
    public void testNoCompletedCheckpointYet() throws Exception {
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(0, Collections.emptyList(), (MetricGroup)new UnregisteredMetricsGroup());
        Assert.assertFalse((boolean)tracker.getJobStats().isDefined());
        Assert.assertFalse((boolean)tracker.getOperatorStats(new JobVertexID()).isDefined());
    }

    @Test
    public void testRandomStats() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(16);
        List<ExecutionJobVertex> tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, (MetricGroup)new UnregisteredMetricsGroup());
        for (int i = 0; i < checkpoints.length; ++i) {
            CompletedCheckpoint checkpoint = checkpoints[i];
            tracker.onCompletedCheckpoint(checkpoint);
            SimpleCheckpointStatsTrackerTest.verifyJobStats((CheckpointStatsTracker)tracker, 10, Arrays.copyOfRange(checkpoints, 0, i + 1));
            SimpleCheckpointStatsTrackerTest.verifySubtaskStats((CheckpointStatsTracker)tracker, tasksToWaitFor, checkpoint);
        }
    }

    @Test
    public void testIllegalOperatorId() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(16);
        List<ExecutionJobVertex> tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, (MetricGroup)new UnregisteredMetricsGroup());
        for (CompletedCheckpoint checkpoint : checkpoints) {
            tracker.onCompletedCheckpoint(checkpoint);
        }
        Assert.assertTrue((boolean)tracker.getJobStats().isDefined());
        Assert.assertTrue((boolean)tracker.getOperatorStats(new JobVertexID()).isEmpty());
    }

    @Test
    public void testCompletedCheckpointReordering() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(2);
        List<ExecutionJobVertex> tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, (MetricGroup)new UnregisteredMetricsGroup());
        tracker.onCompletedCheckpoint(checkpoints[1]);
        SimpleCheckpointStatsTrackerTest.verifyJobStats((CheckpointStatsTracker)tracker, 10, new CompletedCheckpoint[]{checkpoints[1]});
        SimpleCheckpointStatsTrackerTest.verifySubtaskStats((CheckpointStatsTracker)tracker, tasksToWaitFor, checkpoints[1]);
        tracker.onCompletedCheckpoint(checkpoints[0]);
        SimpleCheckpointStatsTrackerTest.verifyJobStats((CheckpointStatsTracker)tracker, 10, checkpoints);
        SimpleCheckpointStatsTrackerTest.verifySubtaskStats((CheckpointStatsTracker)tracker, tasksToWaitFor, checkpoints[1]);
    }

    @Test
    public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
        CompletedCheckpoint[] checkpoints = SimpleCheckpointStatsTrackerTest.generateRandomCheckpoints(2);
        List<ExecutionJobVertex> tasksToWaitFor = this.createTasksToWaitFor(checkpoints[0]);
        SimpleCheckpointStatsTracker tracker = new SimpleCheckpointStatsTracker(10, tasksToWaitFor, (MetricGroup)new UnregisteredMetricsGroup());
        tracker.onCompletedCheckpoint(checkpoints[0]);
        Set jobVerticesID = checkpoints[0].getTaskStates().keySet();
        Iterator jobVertexIDIterator = jobVerticesID.iterator();
        JobVertexID operatorId = null;
        if (jobVertexIDIterator.hasNext()) {
            operatorId = (JobVertexID)jobVertexIDIterator.next();
        }
        Assert.assertNotNull(operatorId);
        Assert.assertNotNull((Object)tracker.getOperatorStats(operatorId));
        Field f = tracker.getClass().getDeclaredField("operatorStatsCache");
        f.setAccessible(true);
        Map cache = (Map)f.get(tracker);
        Assert.assertTrue((boolean)cache.containsKey(operatorId));
        tracker.onCompletedCheckpoint(checkpoints[1]);
        Assert.assertTrue((boolean)cache.isEmpty());
    }

    private static void verifyJobStats(CheckpointStatsTracker tracker, int historySize, CompletedCheckpoint[] checkpoints) {
        Assert.assertTrue((boolean)tracker.getJobStats().isDefined());
        JobCheckpointStats jobStats = (JobCheckpointStats)tracker.getJobStats().get();
        List history = jobStats.getRecentHistory();
        if (historySize > checkpoints.length) {
            Assert.assertEquals((long)checkpoints.length, (long)history.size());
        } else {
            Assert.assertEquals((long)historySize, (long)history.size());
        }
        Assert.assertTrue((checkpoints.length >= history.size() ? 1 : 0) != 0);
        for (int i = 0; i < history.size(); ++i) {
            CheckpointStats actualStats = (CheckpointStats)history.get(history.size() - i - 1);
            CompletedCheckpoint checkpoint = checkpoints[checkpoints.length - 1 - i];
            long stateSize = checkpoint.getStateSize();
            CheckpointStats expectedStats = new CheckpointStats(checkpoint.getCheckpointID(), checkpoint.getTimestamp(), checkpoint.getDuration(), stateSize);
            Assert.assertEquals((Object)expectedStats, (Object)actualStats);
        }
        long minDuration = Long.MAX_VALUE;
        long maxDuration = Long.MIN_VALUE;
        long totalDuration = 0L;
        long minStateSize = Long.MAX_VALUE;
        long maxStateSize = Long.MIN_VALUE;
        long totalStateSize = 0L;
        long count = 0L;
        for (CompletedCheckpoint checkpoint : checkpoints) {
            ++count;
            if (checkpoint.getDuration() < minDuration) {
                minDuration = checkpoint.getDuration();
            }
            if (checkpoint.getDuration() > maxDuration) {
                maxDuration = checkpoint.getDuration();
            }
            totalDuration += checkpoint.getDuration();
            long stateSize = checkpoint.getStateSize();
            if (stateSize < minStateSize) {
                minStateSize = stateSize;
            }
            if (stateSize > maxStateSize) {
                maxStateSize = stateSize;
            }
            totalStateSize += stateSize;
        }
        Assert.assertEquals((long)count, (long)jobStats.getCount());
        Assert.assertEquals((long)minDuration, (long)jobStats.getMinDuration());
        Assert.assertEquals((long)maxDuration, (long)jobStats.getMaxDuration());
        Assert.assertEquals((long)(totalDuration / count), (long)jobStats.getAverageDuration());
        Assert.assertEquals((long)minStateSize, (long)jobStats.getMinStateSize());
        Assert.assertEquals((long)maxStateSize, (long)jobStats.getMaxStateSize());
        Assert.assertEquals((long)(totalStateSize / count), (long)jobStats.getAverageStateSize());
    }

    private static void verifySubtaskStats(CheckpointStatsTracker tracker, List<ExecutionJobVertex> tasksToWaitFor, CompletedCheckpoint checkpoint) {
        for (ExecutionJobVertex vertex : tasksToWaitFor) {
            JobVertexID operatorId = vertex.getJobVertexId();
            int parallelism = vertex.getParallelism();
            TaskState taskState = checkpoint.getTaskState(operatorId);
            Assert.assertNotNull((Object)taskState);
            OperatorCheckpointStats actualStats = (OperatorCheckpointStats)tracker.getOperatorStats(operatorId).get();
            long operatorDuration = Long.MIN_VALUE;
            long operatorStateSize = 0L;
            long[][] expectedSubTaskStats = new long[parallelism][2];
            for (int i = 0; i < parallelism; ++i) {
                SubtaskState subtaskState = taskState.getState(i);
                expectedSubTaskStats[i][0] = subtaskState.getDuration();
                expectedSubTaskStats[i][1] = subtaskState.getStateSize();
            }
            OperatorCheckpointStats expectedStats = new OperatorCheckpointStats(checkpoint.getCheckpointID(), checkpoint.getTimestamp(), operatorDuration, operatorStateSize, expectedSubTaskStats);
            Assert.assertEquals((Object)expectedStats, (Object)actualStats);
        }
    }

    private static CompletedCheckpoint[] generateRandomCheckpoints(int numCheckpoints) throws IOException {
        int i;
        JobID jobId = new JobID();
        int minNumOperators = 4;
        int maxNumOperators = 32;
        int minParallelism = 4;
        int maxParallelism = 16;
        long minStateSize = Integer.MAX_VALUE;
        long maxStateSize = Long.MAX_VALUE;
        CompletedCheckpoint[] checkpoints = new CompletedCheckpoint[numCheckpoints];
        int numOperators = RAND.nextInt(maxNumOperators - minNumOperators + 1) + minNumOperators;
        JobVertexID[] operatorIds = new JobVertexID[numOperators];
        int[] operatorParallelism = new int[numOperators];
        for (i = 0; i < numOperators; ++i) {
            operatorIds[i] = new JobVertexID();
            operatorParallelism[i] = RAND.nextInt(maxParallelism - minParallelism + 1) + minParallelism;
        }
        for (i = 0; i < numCheckpoints; ++i) {
            long triggerTimestamp = System.currentTimeMillis();
            int maxDuration = RAND.nextInt(129);
            HashMap<JobVertexID, TaskState> taskGroupStates = new HashMap<JobVertexID, TaskState>();
            int completionDuration = 0;
            for (int operatorIndex = 0; operatorIndex < numOperators; ++operatorIndex) {
                JobVertexID operatorId = operatorIds[operatorIndex];
                int parallelism = operatorParallelism[operatorIndex];
                TaskState taskState = new TaskState(operatorId, parallelism);
                taskGroupStates.put(operatorId, taskState);
                for (int subtaskIndex = 0; subtaskIndex < parallelism; ++subtaskIndex) {
                    int duration = RAND.nextInt(maxDuration + 1);
                    if (duration > completionDuration) {
                        completionDuration = duration;
                    }
                    SubtaskState subtaskState = new SubtaskState(new SerializedValue(null), minStateSize + (long)(RAND.nextDouble() * (double)(maxStateSize - minStateSize)), (long)duration);
                    taskState.putState(subtaskIndex, subtaskState);
                }
            }
            long completionTimestamp = triggerTimestamp + (long)completionDuration + (long)RAND.nextInt(10);
            checkpoints[i] = new CompletedCheckpoint(jobId, (long)i, triggerTimestamp, completionTimestamp, taskGroupStates);
        }
        return checkpoints;
    }

    private List<ExecutionJobVertex> createTasksToWaitFor(CompletedCheckpoint checkpoint) {
        ArrayList<ExecutionJobVertex> jobVertices = new ArrayList<ExecutionJobVertex>(checkpoint.getTaskStates().size());
        for (Map.Entry entry : checkpoint.getTaskStates().entrySet()) {
            JobVertexID operatorId = (JobVertexID)entry.getKey();
            int parallelism = ((TaskState)entry.getValue()).getParallelism();
            ExecutionJobVertex v = (ExecutionJobVertex)Mockito.mock(ExecutionJobVertex.class);
            Mockito.when((Object)v.getJobVertexId()).thenReturn((Object)operatorId);
            Mockito.when((Object)v.getParallelism()).thenReturn((Object)parallelism);
            jobVertices.add(v);
        }
        return jobVertices;
    }
}

