package org.apache.flink.runtime.checkpoint;

import java.util.List;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.OperatorStreamStateHandle;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;

@PrepareForTest({PendingCheckpoint.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.class */
public class CheckpointCoordinatorFailureTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest$FailingCompletedCheckpointStore.class */
    private static final class FailingCompletedCheckpointStore implements CompletedCheckpointStore {
        private FailingCompletedCheckpointStore() {
        }

        public void recover() throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void addCheckpoint(CompletedCheckpoint completedCheckpoint) throws Exception {
            throw new Exception("The failing completed checkpoint store failed again... :-(");
        }

        public CompletedCheckpoint getLatestCheckpoint(boolean z) throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public void shutdown(JobStatus jobStatus) throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
            throw new UnsupportedOperationException("Not implemented.");
        }

        public int getNumberOfRetainedCheckpoints() {
            return -1;
        }

        public int getMaxNumberOfRetainedCheckpoints() {
            return 1;
        }

        public boolean requiresExternalizedCheckpoints() {
            return false;
        }
    }

    @Test
    public void testFailingCompletedCheckpointStoreAdd() throws Exception {
        JobID jobID = new JobID();
        ExecutionAttemptID executionAttemptID = new ExecutionAttemptID();
        ExecutionVertex mockExecutionVertex = CheckpointCoordinatorTest.mockExecutionVertex(executionAttemptID);
        CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinator(jobID, new CheckpointCoordinatorConfiguration(600000L, 600000L, 0L, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, 0), new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new ExecutionVertex[]{mockExecutionVertex}, new StandaloneCheckpointIDCounter(), new FailingCompletedCheckpointStore(), new MemoryStateBackend(), Executors.directExecutor(), SharedStateRegistry.DEFAULT_FACTORY, new CheckpointFailureManager(0, NoOpFailJobCall.INSTANCE));
        checkpointCoordinator.triggerCheckpoint(1L, false);
        Assert.assertEquals(1L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) checkpointCoordinator.getPendingCheckpoints().values().iterator().next();
        Assert.assertFalse(pendingCheckpoint.isDiscarded());
        long longValue = ((Long) checkpointCoordinator.getPendingCheckpoints().keySet().iterator().next()).longValue();
        OperatorSubtaskState operatorSubtaskState = (OperatorSubtaskState) Mockito.spy(new OperatorSubtaskState((OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class), (OperatorStateHandle) Mockito.mock(OperatorStreamStateHandle.class), (KeyedStateHandle) Mockito.mock(KeyedStateHandle.class), (KeyedStateHandle) Mockito.mock(KeyedStateHandle.class)));
        TaskStateSnapshot taskStateSnapshot = (TaskStateSnapshot) Mockito.spy(new TaskStateSnapshot());
        taskStateSnapshot.putSubtaskStateByOperatorID(new OperatorID(), operatorSubtaskState);
        Mockito.when(taskStateSnapshot.getSubtaskStateByOperatorID(OperatorID.fromJobVertexID(mockExecutionVertex.getJobvertexId()))).thenReturn(operatorSubtaskState);
        try {
            checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionAttemptID, longValue, new CheckpointMetrics(), taskStateSnapshot), "Unknown location");
            Assert.fail("Expected a checkpoint exception because the completed checkpoint store could not store the completed checkpoint.");
        } catch (CheckpointException e) {
        }
        Assert.assertTrue(pendingCheckpoint.isDiscarded());
        ((OperatorSubtaskState) Mockito.verify(operatorSubtaskState)).discardState();
        ((OperatorStateHandle) Mockito.verify(operatorSubtaskState.getManagedOperatorState().iterator().next())).discardState();
        ((OperatorStateHandle) Mockito.verify(operatorSubtaskState.getRawOperatorState().iterator().next())).discardState();
        ((KeyedStateHandle) Mockito.verify(operatorSubtaskState.getManagedKeyedState().iterator().next())).discardState();
        ((KeyedStateHandle) Mockito.verify(operatorSubtaskState.getRawKeyedState().iterator().next())).discardState();
    }
}
