package org.apache.flink.runtime.checkpoint.stats;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.StateForTask;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/stats/SimpleCheckpointStatsTrackerTest.class */
public class SimpleCheckpointStatsTrackerTest {
    private static final Random RAND = new Random();

    @Test
    public void testNoCompletedCheckpointYet() throws Exception {
        SimpleCheckpointStatsTracker simpleCheckpointStatsTracker = new SimpleCheckpointStatsTracker(0, new ExecutionVertex[0]);
        Assert.assertFalse(simpleCheckpointStatsTracker.getJobStats().isDefined());
        Assert.assertFalse(simpleCheckpointStatsTracker.getOperatorStats(new JobVertexID()).isDefined());
    }

    @Test
    public void testRandomStats() throws Exception {
        CompletedCheckpoint[] generateRandomCheckpoints = generateRandomCheckpoints(16);
        ExecutionVertex[] createTasksToWaitFor = createTasksToWaitFor(generateRandomCheckpoints[0]);
        SimpleCheckpointStatsTracker simpleCheckpointStatsTracker = new SimpleCheckpointStatsTracker(10, createTasksToWaitFor);
        for (int i = 0; i < generateRandomCheckpoints.length; i++) {
            CompletedCheckpoint completedCheckpoint = generateRandomCheckpoints[i];
            simpleCheckpointStatsTracker.onCompletedCheckpoint(completedCheckpoint);
            verifyJobStats(simpleCheckpointStatsTracker, 10, (CompletedCheckpoint[]) Arrays.copyOfRange(generateRandomCheckpoints, 0, i + 1));
            verifySubtaskStats(simpleCheckpointStatsTracker, createTasksToWaitFor, completedCheckpoint);
        }
    }

    @Test
    public void testIllegalOperatorId() throws Exception {
        CompletedCheckpoint[] generateRandomCheckpoints = generateRandomCheckpoints(16);
        SimpleCheckpointStatsTracker simpleCheckpointStatsTracker = new SimpleCheckpointStatsTracker(10, createTasksToWaitFor(generateRandomCheckpoints[0]));
        for (CompletedCheckpoint completedCheckpoint : generateRandomCheckpoints) {
            simpleCheckpointStatsTracker.onCompletedCheckpoint(completedCheckpoint);
        }
        Assert.assertTrue(simpleCheckpointStatsTracker.getJobStats().isDefined());
        Assert.assertTrue(simpleCheckpointStatsTracker.getOperatorStats(new JobVertexID()).isEmpty());
    }

    @Test
    public void testCompletedCheckpointReordering() throws Exception {
        CompletedCheckpoint[] generateRandomCheckpoints = generateRandomCheckpoints(2);
        ExecutionVertex[] createTasksToWaitFor = createTasksToWaitFor(generateRandomCheckpoints[0]);
        SimpleCheckpointStatsTracker simpleCheckpointStatsTracker = new SimpleCheckpointStatsTracker(10, createTasksToWaitFor);
        simpleCheckpointStatsTracker.onCompletedCheckpoint(generateRandomCheckpoints[1]);
        verifyJobStats(simpleCheckpointStatsTracker, 10, new CompletedCheckpoint[]{generateRandomCheckpoints[1]});
        verifySubtaskStats(simpleCheckpointStatsTracker, createTasksToWaitFor, generateRandomCheckpoints[1]);
        simpleCheckpointStatsTracker.onCompletedCheckpoint(generateRandomCheckpoints[0]);
        verifyJobStats(simpleCheckpointStatsTracker, 10, generateRandomCheckpoints);
        verifySubtaskStats(simpleCheckpointStatsTracker, createTasksToWaitFor, generateRandomCheckpoints[1]);
    }

    @Test
    public void testOperatorStateCachedClearedOnNewCheckpoint() throws Exception {
        CompletedCheckpoint[] generateRandomCheckpoints = generateRandomCheckpoints(2);
        SimpleCheckpointStatsTracker simpleCheckpointStatsTracker = new SimpleCheckpointStatsTracker(10, createTasksToWaitFor(generateRandomCheckpoints[0]));
        simpleCheckpointStatsTracker.onCompletedCheckpoint(generateRandomCheckpoints[0]);
        JobVertexID operatorId = ((StateForTask) generateRandomCheckpoints[0].getStates().get(0)).getOperatorId();
        Assert.assertNotNull(simpleCheckpointStatsTracker.getOperatorStats(((StateForTask) generateRandomCheckpoints[0].getStates().get(0)).getOperatorId()));
        Field declaredField = simpleCheckpointStatsTracker.getClass().getDeclaredField("operatorStatsCache");
        declaredField.setAccessible(true);
        Map map = (Map) declaredField.get(simpleCheckpointStatsTracker);
        Assert.assertTrue(map.containsKey(operatorId));
        simpleCheckpointStatsTracker.onCompletedCheckpoint(generateRandomCheckpoints[1]);
        Assert.assertTrue(map.isEmpty());
    }

    private static void verifyJobStats(CheckpointStatsTracker checkpointStatsTracker, int i, CompletedCheckpoint[] completedCheckpointArr) {
        Assert.assertTrue(checkpointStatsTracker.getJobStats().isDefined());
        JobCheckpointStats jobCheckpointStats = (JobCheckpointStats) checkpointStatsTracker.getJobStats().get();
        List recentHistory = jobCheckpointStats.getRecentHistory();
        if (i > completedCheckpointArr.length) {
            Assert.assertEquals(completedCheckpointArr.length, recentHistory.size());
        } else {
            Assert.assertEquals(i, recentHistory.size());
        }
        Assert.assertTrue(completedCheckpointArr.length >= recentHistory.size());
        for (int i2 = 0; i2 < recentHistory.size(); i2++) {
            CheckpointStats checkpointStats = (CheckpointStats) recentHistory.get((recentHistory.size() - i2) - 1);
            CompletedCheckpoint completedCheckpoint = completedCheckpointArr[(completedCheckpointArr.length - 1) - i2];
            long j = 0;
            Iterator it = completedCheckpoint.getStates().iterator();
            while (it.hasNext()) {
                j += ((StateForTask) it.next()).getStateSize();
            }
            Assert.assertEquals(new CheckpointStats(completedCheckpoint.getCheckpointID(), completedCheckpoint.getTimestamp(), completedCheckpoint.getDuration(), j), checkpointStats);
        }
        long j2 = Long.MAX_VALUE;
        long j3 = Long.MIN_VALUE;
        long j4 = 0;
        long j5 = Long.MAX_VALUE;
        long j6 = Long.MIN_VALUE;
        long j7 = 0;
        long j8 = 0;
        for (CompletedCheckpoint completedCheckpoint2 : completedCheckpointArr) {
            j8++;
            if (completedCheckpoint2.getDuration() < j2) {
                j2 = completedCheckpoint2.getDuration();
            }
            if (completedCheckpoint2.getDuration() > j3) {
                j3 = completedCheckpoint2.getDuration();
            }
            j4 += completedCheckpoint2.getDuration();
            long j9 = 0;
            Iterator it2 = completedCheckpoint2.getStates().iterator();
            while (it2.hasNext()) {
                j9 += ((StateForTask) it2.next()).getStateSize();
            }
            if (j9 < j5) {
                j5 = j9;
            }
            if (j9 > j6) {
                j6 = j9;
            }
            j7 += j9;
        }
        Assert.assertEquals(j8, jobCheckpointStats.getCount());
        Assert.assertEquals(j2, jobCheckpointStats.getMinDuration());
        Assert.assertEquals(j3, jobCheckpointStats.getMaxDuration());
        Assert.assertEquals(j4 / j8, jobCheckpointStats.getAverageDuration());
        Assert.assertEquals(j5, jobCheckpointStats.getMinStateSize());
        Assert.assertEquals(j6, jobCheckpointStats.getMaxStateSize());
        Assert.assertEquals(j7 / j8, jobCheckpointStats.getAverageStateSize());
    }

    private static void verifySubtaskStats(CheckpointStatsTracker checkpointStatsTracker, ExecutionVertex[] executionVertexArr, CompletedCheckpoint completedCheckpoint) {
        for (ExecutionVertex executionVertex : executionVertexArr) {
            JobVertexID jobvertexId = executionVertex.getJobvertexId();
            int totalNumberOfParallelSubtasks = executionVertex.getTotalNumberOfParallelSubtasks();
            OperatorCheckpointStats operatorCheckpointStats = (OperatorCheckpointStats) checkpointStatsTracker.getOperatorStats(jobvertexId).get();
            long[][] jArr = new long[totalNumberOfParallelSubtasks][2];
            for (int i = 0; i < totalNumberOfParallelSubtasks; i++) {
                long j = -1;
                long j2 = -1;
                for (StateForTask stateForTask : completedCheckpoint.getStates()) {
                    if (stateForTask.getOperatorId().equals(jobvertexId) && stateForTask.getSubtask() == i) {
                        j = stateForTask.getDuration();
                        j2 = stateForTask.getStateSize();
                    }
                }
                jArr[i][0] = j;
                jArr[i][1] = j2;
            }
            Assert.assertEquals(new OperatorCheckpointStats(completedCheckpoint.getCheckpointID(), completedCheckpoint.getTimestamp(), Long.MIN_VALUE, 0L, jArr), operatorCheckpointStats);
        }
    }

    private static CompletedCheckpoint[] generateRandomCheckpoints(int i) throws IOException {
        JobID jobID = new JobID();
        CompletedCheckpoint[] completedCheckpointArr = new CompletedCheckpoint[i];
        int nextInt = RAND.nextInt((32 - 4) + 1) + 4;
        JobVertexID[] jobVertexIDArr = new JobVertexID[nextInt];
        int[] iArr = new int[nextInt];
        for (int i2 = 0; i2 < nextInt; i2++) {
            jobVertexIDArr[i2] = new JobVertexID();
            iArr[i2] = RAND.nextInt((16 - 4) + 1) + 4;
        }
        for (int i3 = 0; i3 < i; i3++) {
            long currentTimeMillis = System.currentTimeMillis();
            int nextInt2 = RAND.nextInt(129);
            ArrayList arrayList = new ArrayList();
            int i4 = 0;
            for (int i5 = 0; i5 < nextInt; i5++) {
                JobVertexID jobVertexID = jobVertexIDArr[i5];
                int i6 = iArr[i5];
                for (int i7 = 0; i7 < i6; i7++) {
                    int nextInt3 = RAND.nextInt(nextInt2 + 1);
                    if (nextInt3 > i4) {
                        i4 = nextInt3;
                    }
                    arrayList.add(new StateForTask(new SerializedValue((Object) null), 2147483647L + ((long) (RAND.nextDouble() * (Long.MAX_VALUE - 2147483647L))), jobVertexID, i7, nextInt3));
                }
            }
            completedCheckpointArr[i3] = new CompletedCheckpoint(jobID, i3, currentTimeMillis, currentTimeMillis + i4 + RAND.nextInt(10), arrayList);
        }
        return completedCheckpointArr;
    }

    private ExecutionVertex[] createTasksToWaitFor(CompletedCheckpoint completedCheckpoint) {
        HashMap hashMap = new HashMap();
        for (StateForTask stateForTask : completedCheckpoint.getStates()) {
            Integer num = (Integer) hashMap.get(stateForTask.getOperatorId());
            if (num == null) {
                hashMap.put(stateForTask.getOperatorId(), Integer.valueOf(stateForTask.getSubtask() + 1));
            } else if (num.intValue() < stateForTask.getSubtask() + 1) {
                hashMap.put(stateForTask.getOperatorId(), Integer.valueOf(stateForTask.getSubtask() + 1));
            }
        }
        ExecutionVertex[] executionVertexArr = new ExecutionVertex[hashMap.size()];
        int i = 0;
        for (JobVertexID jobVertexID : hashMap.keySet()) {
            executionVertexArr[i] = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
            Mockito.when(executionVertexArr[i].getJobvertexId()).thenReturn(jobVertexID);
            Mockito.when(Integer.valueOf(executionVertexArr[i].getTotalNumberOfParallelSubtasks())).thenReturn(hashMap.get(jobVertexID));
            i++;
        }
        return executionVertexArr;
    }
}
