package org.apache.flink.runtime.checkpoint;

import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.NeverCompleteFuture;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.class */
public class FailoverStrategyCheckpointCoordinatorTest extends TestLogger {
    private ManuallyTriggeredScheduledExecutor manualThreadExecutor;

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest$ManualCheckpointTimer.class */
    public static class ManualCheckpointTimer extends ScheduledThreadPoolExecutor {
        private final ScheduledExecutor scheduledExecutor;
        private long manualDelay;

        ManualCheckpointTimer(ScheduledExecutor scheduledExecutor) {
            super(0);
            this.manualDelay = 0L;
            this.scheduledExecutor = scheduledExecutor;
        }

        void setManualDelay(long j) {
            this.manualDelay = j;
        }

        @Override // java.util.concurrent.ScheduledThreadPoolExecutor, java.util.concurrent.ScheduledExecutorService
        public ScheduledFuture<?> schedule(Runnable runnable, long j, TimeUnit timeUnit) {
            return new NeverCompleteFuture(j);
        }

        @Override // java.util.concurrent.ScheduledThreadPoolExecutor, java.util.concurrent.ScheduledExecutorService
        public <V> ScheduledFuture<V> schedule(Callable<V> callable, long j, TimeUnit timeUnit) {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.ScheduledThreadPoolExecutor, java.util.concurrent.ScheduledExecutorService
        public ScheduledFuture<?> scheduleAtFixedRate(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            return this.scheduledExecutor.scheduleWithFixedDelay(runnable, this.manualDelay, j2, timeUnit);
        }

        @Override // java.util.concurrent.ScheduledThreadPoolExecutor, java.util.concurrent.ScheduledExecutorService
        public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable, long j, long j2, TimeUnit timeUnit) {
            throw new UnsupportedOperationException();
        }

        @Override // java.util.concurrent.ScheduledThreadPoolExecutor, java.util.concurrent.ThreadPoolExecutor, java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            this.scheduledExecutor.execute(runnable);
        }
    }

    @Before
    public void setUp() {
        this.manualThreadExecutor = new ManuallyTriggeredScheduledExecutor();
    }

    @Test
    public void testAbortPendingCheckpointsWithTriggerValidation() {
        int nextInt = ThreadLocalRandom.current().nextInt(10) + 1;
        ExecutionVertex mockExecutionVertex = mockExecutionVertex();
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(new JobID(), new CheckpointCoordinatorConfiguration(2147483647L, 2147483647L, 0L, nextInt, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0), new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, (CheckpointFailureManager) Mockito.mock(CheckpointFailureManager.class));
        mockExecutionRunning(mockExecutionVertex);
        ManualCheckpointTimer manualCheckpointTimer = new ManualCheckpointTimer(this.manualThreadExecutor);
        manualCheckpointTimer.setManualDelay(0L);
        Whitebox.setInternalState(checkpointCoordinator, "timer", manualCheckpointTimer);
        checkpointCoordinator.startCheckpointScheduler();
        Assert.assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
        this.manualThreadExecutor.triggerAll();
        this.manualThreadExecutor.triggerScheduledTasks();
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        for (int i = 1; i < nextInt; i++) {
            checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
            Assert.assertEquals(i + 1, checkpointCoordinator.getNumberOfPendingCheckpoints());
            Assert.assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
        }
        checkpointCoordinator.triggerCheckpoint(System.currentTimeMillis(), false);
        Assert.assertFalse(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
        Assert.assertEquals(nextInt, checkpointCoordinator.getNumberOfPendingCheckpoints());
        checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.JOB_FAILOVER_REGION));
        Assert.assertTrue(checkpointCoordinator.isCurrentPeriodicTriggerAvailable());
        Assert.assertEquals(0L, checkpointCoordinator.getNumberOfPendingCheckpoints());
    }

    private ExecutionVertex mockExecutionVertex() {
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        Execution execution = (Execution) Mockito.mock(Execution.class);
        PowerMockito.when(execution.getAttemptId()).thenReturn(executionAttemptID);
        PowerMockito.when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
        return executionVertex;
    }

    private void mockExecutionRunning(ExecutionVertex executionVertex) {
        PowerMockito.when(executionVertex.getCurrentExecutionAttempt().getState()).thenReturn(ExecutionState.RUNNING);
    }
}
