package org.apache.flink.runtime.checkpoint;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.ArgumentMatchers;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/PendingCheckpointTest.class */
public class PendingCheckpointTest {
    private static final Map<ExecutionAttemptID, ExecutionVertex> ACK_TASKS = new HashMap();
    private static final ExecutionAttemptID ATTEMPT_ID = new ExecutionAttemptID();

    @Rule
    public final TemporaryFolder tmpFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/PendingCheckpointTest$QueueExecutor.class */
    private static final class QueueExecutor implements Executor {
        private final Queue<Runnable> queue;

        private QueueExecutor() {
            this.queue = new ArrayDeque(4);
        }

        @Override // java.util.concurrent.Executor
        public void execute(Runnable runnable) {
            this.queue.add(runnable);
        }

        public void runQueuedCommands() {
            Iterator<Runnable> it = this.queue.iterator();
            while (it.hasNext()) {
                it.next().run();
            }
        }
    }

    @Test
    public void testCanBeSubsumed() throws Exception {
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(new CheckpointProperties(true, CheckpointType.SAVEPOINT, false, false, false, false, false));
        Assert.assertFalse(createPendingCheckpoint.canBeSubsumed());
        try {
            createPendingCheckpoint.abortSubsumed();
            Assert.fail("Did not throw expected Exception");
        } catch (IllegalStateException e) {
        }
        Assert.assertTrue(createPendingCheckpoint(new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false)).canBeSubsumed());
    }

    @Test
    public void testCompletionFuture() throws Exception {
        CheckpointProperties checkpointProperties = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(checkpointProperties);
        CompletableFuture completionFuture = createPendingCheckpoint.getCompletionFuture();
        Assert.assertFalse(completionFuture.isDone());
        createPendingCheckpoint.abortDeclined();
        Assert.assertTrue(completionFuture.isDone());
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(checkpointProperties);
        CompletableFuture completionFuture2 = createPendingCheckpoint2.getCompletionFuture();
        Assert.assertFalse(completionFuture2.isDone());
        createPendingCheckpoint2.abortExpired();
        Assert.assertTrue(completionFuture2.isDone());
        PendingCheckpoint createPendingCheckpoint3 = createPendingCheckpoint(checkpointProperties);
        CompletableFuture completionFuture3 = createPendingCheckpoint3.getCompletionFuture();
        Assert.assertFalse(completionFuture3.isDone());
        createPendingCheckpoint3.abortSubsumed();
        Assert.assertTrue(completionFuture3.isDone());
        PendingCheckpoint createPendingCheckpoint4 = createPendingCheckpoint(checkpointProperties);
        CompletableFuture completionFuture4 = createPendingCheckpoint4.getCompletionFuture();
        Assert.assertFalse(completionFuture4.isDone());
        createPendingCheckpoint4.acknowledgeTask(ATTEMPT_ID, (TaskStateSnapshot) null, new CheckpointMetrics());
        Assert.assertTrue(createPendingCheckpoint4.isFullyAcknowledged());
        createPendingCheckpoint4.finalizeCheckpoint();
        Assert.assertTrue(completionFuture4.isDone());
        PendingCheckpoint createPendingCheckpoint5 = createPendingCheckpoint(checkpointProperties);
        Assert.assertFalse(createPendingCheckpoint5.getCompletionFuture().isDone());
        try {
            createPendingCheckpoint5.finalizeCheckpoint();
            Assert.fail("Did not throw expected Exception");
        } catch (IllegalStateException e) {
        }
    }

    @Test
    public void testAbortDiscardsState() throws Exception {
        CheckpointProperties checkpointProperties = new CheckpointProperties(false, CheckpointType.SAVEPOINT, false, false, false, false, false);
        QueueExecutor queueExecutor = new QueueExecutor();
        OperatorState operatorState = (OperatorState) Mockito.mock(OperatorState.class);
        ((OperatorState) Mockito.doNothing().when(operatorState)).registerSharedStates((SharedStateRegistry) Matchers.any(SharedStateRegistry.class));
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(checkpointProperties, queueExecutor);
        setTaskState(createPendingCheckpoint, operatorState);
        createPendingCheckpoint.abortDeclined();
        queueExecutor.runQueuedCommands();
        ((OperatorState) Mockito.verify(operatorState, Mockito.times(1))).discardState();
        Mockito.reset(new OperatorState[]{operatorState});
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(checkpointProperties, queueExecutor);
        setTaskState(createPendingCheckpoint2, operatorState);
        createPendingCheckpoint2.abortError(new Exception("Expected Test Exception"));
        queueExecutor.runQueuedCommands();
        ((OperatorState) Mockito.verify(operatorState, Mockito.times(1))).discardState();
        Mockito.reset(new OperatorState[]{operatorState});
        PendingCheckpoint createPendingCheckpoint3 = createPendingCheckpoint(checkpointProperties, queueExecutor);
        setTaskState(createPendingCheckpoint3, operatorState);
        createPendingCheckpoint3.abortExpired();
        queueExecutor.runQueuedCommands();
        ((OperatorState) Mockito.verify(operatorState, Mockito.times(1))).discardState();
        Mockito.reset(new OperatorState[]{operatorState});
        PendingCheckpoint createPendingCheckpoint4 = createPendingCheckpoint(checkpointProperties, queueExecutor);
        setTaskState(createPendingCheckpoint4, operatorState);
        createPendingCheckpoint4.abortSubsumed();
        queueExecutor.runQueuedCommands();
        ((OperatorState) Mockito.verify(operatorState, Mockito.times(1))).discardState();
    }

    @Test
    public void testPendingCheckpointStatsCallbacks() throws Exception {
        PendingCheckpointStats pendingCheckpointStats = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        createPendingCheckpoint.setStatsCallback(pendingCheckpointStats);
        createPendingCheckpoint.acknowledgeTask(ATTEMPT_ID, (TaskStateSnapshot) null, new CheckpointMetrics());
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats, Mockito.times(1))).reportSubtaskStats((JobVertexID) ArgumentMatchers.nullable(JobVertexID.class), (SubtaskStateStats) Matchers.any(SubtaskStateStats.class));
        createPendingCheckpoint.finalizeCheckpoint();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats, Mockito.times(1))).reportCompletedCheckpoint((String) Matchers.any(String.class));
        PendingCheckpointStats pendingCheckpointStats2 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        createPendingCheckpoint2.setStatsCallback(pendingCheckpointStats2);
        createPendingCheckpoint2.abortSubsumed();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats2, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
        PendingCheckpointStats pendingCheckpointStats3 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint3 = createPendingCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        createPendingCheckpoint3.setStatsCallback(pendingCheckpointStats3);
        createPendingCheckpoint3.abortDeclined();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats3, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
        PendingCheckpointStats pendingCheckpointStats4 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint4 = createPendingCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        createPendingCheckpoint4.setStatsCallback(pendingCheckpointStats4);
        createPendingCheckpoint4.abortError(new Exception("Expected test error"));
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats4, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
        PendingCheckpointStats pendingCheckpointStats5 = (PendingCheckpointStats) Mockito.mock(PendingCheckpointStats.class);
        PendingCheckpoint createPendingCheckpoint5 = createPendingCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        createPendingCheckpoint5.setStatsCallback(pendingCheckpointStats5);
        createPendingCheckpoint5.abortExpired();
        ((PendingCheckpointStats) Mockito.verify(pendingCheckpointStats5, Mockito.times(1))).reportFailedCheckpoint(Matchers.anyLong(), (Throwable) Matchers.any(Exception.class));
    }

    @Test
    public void testNullSubtaskStateLeadsToStatelessTask() throws Exception {
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        createPendingCheckpoint.acknowledgeTask(ATTEMPT_ID, (TaskStateSnapshot) null, (CheckpointMetrics) Mockito.mock(CheckpointMetrics.class));
        Assert.assertTrue(createPendingCheckpoint.getOperatorStates().isEmpty());
    }

    @Test
    public void testNonNullSubtaskStateLeadsToStatefulTask() throws Exception {
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION));
        createPendingCheckpoint.acknowledgeTask(ATTEMPT_ID, (TaskStateSnapshot) Mockito.mock(TaskStateSnapshot.class), (CheckpointMetrics) Mockito.mock(CheckpointMetrics.class));
        Assert.assertFalse(createPendingCheckpoint.getOperatorStates().isEmpty());
    }

    @Test
    public void testSetCanceller() throws Exception {
        CheckpointProperties checkpointProperties = new CheckpointProperties(false, CheckpointType.CHECKPOINT, true, true, true, true, true);
        PendingCheckpoint createPendingCheckpoint = createPendingCheckpoint(checkpointProperties);
        createPendingCheckpoint.abortDeclined();
        Assert.assertTrue(createPendingCheckpoint.isDiscarded());
        Assert.assertFalse(createPendingCheckpoint.setCancellerHandle((ScheduledFuture) Mockito.mock(ScheduledFuture.class)));
        PendingCheckpoint createPendingCheckpoint2 = createPendingCheckpoint(checkpointProperties);
        ScheduledFuture scheduledFuture = (ScheduledFuture) Mockito.mock(ScheduledFuture.class);
        Assert.assertTrue(createPendingCheckpoint2.setCancellerHandle(scheduledFuture));
        createPendingCheckpoint2.abortDeclined();
        ((ScheduledFuture) Mockito.verify(scheduledFuture)).cancel(false);
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties checkpointProperties) throws IOException {
        return createPendingCheckpoint(checkpointProperties, Executors.directExecutor());
    }

    private PendingCheckpoint createPendingCheckpoint(CheckpointProperties checkpointProperties, Executor executor) throws IOException {
        Path path = new Path(this.tmpFolder.newFolder().toURI());
        FsCheckpointStorageLocation fsCheckpointStorageLocation = new FsCheckpointStorageLocation(LocalFileSystem.getSharedInstance(), path, path, path, CheckpointStorageLocationReference.getDefault(), 1024);
        return new PendingCheckpoint(new JobID(), 0L, 1L, new HashMap(ACK_TASKS), checkpointProperties, fsCheckpointStorageLocation, executor);
    }

    static void setTaskState(PendingCheckpoint pendingCheckpoint, OperatorState operatorState) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = PendingCheckpoint.class.getDeclaredField("operatorStates");
        declaredField.setAccessible(true);
        ((Map) declaredField.get(pendingCheckpoint)).put(new OperatorID(), operatorState);
    }

    static {
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Mockito.mock(ExecutionJobVertex.class);
        PowerMockito.when(executionJobVertex.getOperatorIDs()).thenReturn(Collections.singletonList(new OperatorID()));
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        PowerMockito.when(Integer.valueOf(executionVertex.getMaxParallelism())).thenReturn(128);
        PowerMockito.when(Integer.valueOf(executionVertex.getTotalNumberOfParallelSubtasks())).thenReturn(1);
        PowerMockito.when(executionVertex.getJobVertex()).thenReturn(executionJobVertex);
        ACK_TASKS.put(ATTEMPT_ID, executionVertex);
    }
}
