package org.apache.flink.runtime.checkpoint.savepoint;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.checkpoint.CheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.TaskState;
import org.apache.flink.runtime.checkpoint.stats.DisabledCheckpointStatsTracker;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete;
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.Promise;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest.class */
public class SavepointCoordinatorTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/checkpoint/savepoint/SavepointCoordinatorTest$MockCheckpointIdCounter.class */
    private static class MockCheckpointIdCounter implements CheckpointIDCounter {
        private boolean started;
        private long count;
        private long lastReturnedCount;

        private MockCheckpointIdCounter() {
        }

        public void start() throws Exception {
            this.started = true;
        }

        public void shutdown() throws Exception {
            this.started = false;
        }

        public void suspend() throws Exception {
            this.started = false;
        }

        /*  JADX ERROR: Failed to decode insn: 0x000D: MOVE_MULTI, method: org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinatorTest.MockCheckpointIdCounter.getAndIncrement():long
            java.lang.ArrayIndexOutOfBoundsException: arraycopy: source index -1 out of bounds for object array[8]
            	at java.base/java.lang.System.arraycopy(Native Method)
            	at jadx.plugins.input.java.data.code.StackState.insert(StackState.java:49)
            	at jadx.plugins.input.java.data.code.CodeDecodeState.insert(CodeDecodeState.java:118)
            	at jadx.plugins.input.java.data.code.JavaInsnsRegister.dup2x1(JavaInsnsRegister.java:313)
            	at jadx.plugins.input.java.data.code.JavaInsnData.decode(JavaInsnData.java:46)
            	at jadx.core.dex.instructions.InsnDecoder.lambda$process$0(InsnDecoder.java:54)
            	at jadx.plugins.input.java.data.code.JavaCodeReader.visitInstructions(JavaCodeReader.java:81)
            	at jadx.core.dex.instructions.InsnDecoder.process(InsnDecoder.java:50)
            	at jadx.core.dex.nodes.MethodNode.load(MethodNode.java:156)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:443)
            	at jadx.core.dex.nodes.ClassNode.load(ClassNode.java:449)
            	at jadx.core.ProcessClass.process(ProcessClass.java:70)
            	at jadx.core.ProcessClass.generateCode(ProcessClass.java:118)
            	at jadx.core.dex.nodes.ClassNode.generateClassCode(ClassNode.java:400)
            	at jadx.core.dex.nodes.ClassNode.decompile(ClassNode.java:388)
            	at jadx.core.dex.nodes.ClassNode.getCode(ClassNode.java:338)
            */
        public long getAndIncrement() throws java.lang.Exception {
            /*
                r8 = this;
                r0 = r8
                r1 = r8
                long r1 = r1.count
                r0.lastReturnedCount = r1
                r0 = r8
                r1 = r0
                long r1 = r1.count
                // decode failed: arraycopy: source index -1 out of bounds for object array[8]
                r2 = 1
                long r1 = r1 + r2
                r0.count = r1
                return r-1
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.runtime.checkpoint.savepoint.SavepointCoordinatorTest.MockCheckpointIdCounter.getAndIncrement():long");
        }

        public void setCount(long j) {
            this.count = j;
        }

        long getLastReturnedCount() {
            return this.lastReturnedCount;
        }

        public boolean isStarted() {
            return this.started;
        }
    }

    @Test
    public void testSimpleTriggerSavepoint() throws Exception {
        JobID jobID = new JobID();
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(jobID), mockExecutionVertex(jobID)};
        MockCheckpointIdCounter mockCheckpointIdCounter = new MockCheckpointIdCounter();
        HeapSavepointStore heapSavepointStore = new HeapSavepointStore();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, executionVertexArr, executionVertexArr, executionVertexArr, mockCheckpointIdCounter, heapSavepointStore);
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1272635L);
        Assert.assertFalse(triggerSavepoint.isCompleted());
        long lastReturnedCount = mockCheckpointIdCounter.getLastReturnedCount();
        Assert.assertEquals(0L, lastReturnedCount);
        for (ExecutionVertex executionVertex : executionVertexArr) {
            verifyTriggerCheckpoint(executionVertex, lastReturnedCount, 1272635L);
        }
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) createSavepointCoordinator.getPendingCheckpoints().get(Long.valueOf(lastReturnedCount));
        verifyPendingCheckpoint(pendingCheckpoint, jobID, lastReturnedCount, 1272635L, 0, 2, 0, false, false);
        for (ExecutionVertex executionVertex2 : executionVertexArr) {
            createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertex2.getCurrentExecutionAttempt().getAttemptId(), lastReturnedCount, createSerializedStateHandle(executionVertex2), 0L));
        }
        Assert.assertTrue(pendingCheckpoint.isDiscarded());
        Assert.assertEquals(0L, createSavepointCoordinator.getSuccessfulCheckpoints().size());
        for (ExecutionVertex executionVertex3 : executionVertexArr) {
            verifyNotifyCheckpointComplete(executionVertex3, lastReturnedCount, 1272635L);
        }
        Assert.assertTrue(triggerSavepoint.isCompleted());
        verifySavepoint(heapSavepointStore.loadSavepoint((String) Await.result(triggerSavepoint, FiniteDuration.Zero())), lastReturnedCount, executionVertexArr);
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testTriggerAndDeclineCheckpointSimple() throws Exception {
        JobID jobID = new JobID();
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(jobID), mockExecutionVertex(jobID)};
        MockCheckpointIdCounter mockCheckpointIdCounter = new MockCheckpointIdCounter();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, executionVertexArr, executionVertexArr, executionVertexArr, mockCheckpointIdCounter, new HeapSavepointStore());
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1272635L);
        Assert.assertFalse(triggerSavepoint.isCompleted());
        long lastReturnedCount = mockCheckpointIdCounter.getLastReturnedCount();
        Assert.assertEquals(0L, lastReturnedCount);
        for (ExecutionVertex executionVertex : executionVertexArr) {
            verifyTriggerCheckpoint(executionVertex, lastReturnedCount, 1272635L);
        }
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) createSavepointCoordinator.getPendingCheckpoints().get(Long.valueOf(lastReturnedCount));
        verifyPendingCheckpoint(pendingCheckpoint, jobID, lastReturnedCount, 1272635L, 0, 2, 0, false, false);
        createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertexArr[0].getCurrentExecutionAttempt().getAttemptId(), lastReturnedCount, createSerializedStateHandle(executionVertexArr[0]), 0L));
        createSavepointCoordinator.receiveDeclineMessage(new DeclineCheckpoint(jobID, executionVertexArr[1].getCurrentExecutionAttempt().getAttemptId(), lastReturnedCount, 0L));
        Assert.assertTrue(pendingCheckpoint.isDiscarded());
        Assert.assertEquals(0L, createSavepointCoordinator.getSuccessfulCheckpoints().size());
        Assert.assertTrue(triggerSavepoint.isCompleted());
        try {
            Await.result(triggerSavepoint.failed(), FiniteDuration.Zero());
            Assert.fail("Did not throw expected exception");
        } catch (Throwable th) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testSimpleRollbackSavepoint() throws Exception {
        JobID jobID = new JobID();
        ExecutionJobVertex[] executionJobVertexArr = {mockExecutionJobVertex(jobID, new JobVertexID(), 4), mockExecutionJobVertex(jobID, new JobVertexID(), 4)};
        ExecutionVertex[] taskVertices = executionJobVertexArr[0].getTaskVertices();
        ExecutionVertex[] executionVertexArr = new ExecutionVertex[8];
        int i = 0;
        for (ExecutionJobVertex executionJobVertex : executionJobVertexArr) {
            for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                int i2 = i;
                i++;
                executionVertexArr[i2] = executionVertex;
            }
        }
        MockCheckpointIdCounter mockCheckpointIdCounter = new MockCheckpointIdCounter();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, taskVertices, executionVertexArr, new ExecutionVertex[0], mockCheckpointIdCounter, new HeapSavepointStore());
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex executionVertex2 : executionVertexArr) {
            createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertex2.getCurrentExecutionAttempt().getAttemptId(), 0L, createSerializedStateHandle(executionVertex2), 0L));
        }
        String str = (String) Await.result(triggerSavepoint, FiniteDuration.Zero());
        Assert.assertNotNull(str);
        createSavepointCoordinator.restoreSavepoint(createExecutionJobVertexMap(executionJobVertexArr), str);
        for (ExecutionVertex executionVertex3 : executionVertexArr) {
            ((Execution) Mockito.verify(executionVertex3.getCurrentExecutionAttempt(), Mockito.times(1))).setInitialState((SerializedValue) Matchers.any(SerializedValue.class), (Map) Matchers.any(Map.class));
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        Assert.assertTrue(mockCheckpointIdCounter.isStarted());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testRollbackParallelismMismatch() throws Exception {
        JobID jobID = new JobID();
        ExecutionJobVertex[] executionJobVertexArr = new ExecutionJobVertex[2];
        executionJobVertexArr[0] = mockExecutionJobVertex(jobID, new JobVertexID(), 4);
        executionJobVertexArr[1] = mockExecutionJobVertex(jobID, new JobVertexID(), 4);
        ExecutionVertex[] taskVertices = executionJobVertexArr[0].getTaskVertices();
        ExecutionVertex[] executionVertexArr = new ExecutionVertex[8];
        int i = 0;
        for (ExecutionJobVertex executionJobVertex : executionJobVertexArr) {
            for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
                int i2 = i;
                i++;
                executionVertexArr[i2] = executionVertex;
            }
        }
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, taskVertices, executionVertexArr, new ExecutionVertex[0], new MockCheckpointIdCounter(), new HeapSavepointStore());
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex executionVertex2 : executionVertexArr) {
            createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertex2.getCurrentExecutionAttempt().getAttemptId(), 0L, createSerializedStateHandle(executionVertex2), 0L));
        }
        String str = (String) Await.result(triggerSavepoint, FiniteDuration.Zero());
        Assert.assertNotNull(str);
        for (int i3 = 0; i3 < executionJobVertexArr.length; i3++) {
            executionJobVertexArr[i3] = mockExecutionJobVertex(jobID, executionJobVertexArr[i3].getJobVertexId(), 2);
        }
        try {
            createSavepointCoordinator.restoreSavepoint(createExecutionJobVertexMap(executionJobVertexArr), str);
            Assert.fail("Did not throw expected Exception after rollback with parallelism mismatch.");
        } catch (Exception e) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testRollbackStateStoreFailure() throws Exception {
        JobID jobID = new JobID();
        ExecutionJobVertex mockExecutionJobVertex = mockExecutionJobVertex(jobID, new JobVertexID(), 4);
        HeapSavepointStore heapSavepointStore = (HeapSavepointStore) Mockito.spy(new HeapSavepointStore());
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, mockExecutionJobVertex.getTaskVertices(), mockExecutionJobVertex.getTaskVertices(), new ExecutionVertex[0], new MockCheckpointIdCounter(), heapSavepointStore);
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex executionVertex : mockExecutionJobVertex.getTaskVertices()) {
            createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertex.getCurrentExecutionAttempt().getAttemptId(), 0L, createSerializedStateHandle(executionVertex), 0L));
        }
        String str = (String) Await.result(triggerSavepoint, FiniteDuration.Zero());
        Assert.assertNotNull(str);
        ((HeapSavepointStore) Mockito.doThrow(new RuntimeException("TestException")).when(heapSavepointStore)).loadSavepoint(Matchers.anyString());
        try {
            createSavepointCoordinator.restoreSavepoint(createExecutionJobVertexMap(mockExecutionJobVertex), str);
            Assert.fail("Did not throw expected Exception after rollback with savepoint store failure.");
        } catch (Exception e) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testRollbackSetsCheckpointID() throws Exception {
        SavepointV0 savepointV0 = new SavepointV0(12312312L, Collections.emptyList());
        CheckpointIDCounter checkpointIDCounter = (CheckpointIDCounter) Mockito.mock(CheckpointIDCounter.class);
        SavepointStore savepointStore = (SavepointStore) Mockito.mock(SavepointStore.class);
        Mockito.when(savepointStore.loadSavepoint(Matchers.anyString())).thenReturn(savepointV0);
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(new JobID(), 60000L, new ExecutionVertex[0], new ExecutionVertex[0], new ExecutionVertex[0], checkpointIDCounter, savepointStore);
        createSavepointCoordinator.restoreSavepoint(createExecutionJobVertexMap(new ExecutionJobVertex[0]), "any");
        ((CheckpointIDCounter) Mockito.verify(checkpointIDCounter)).setCount(Matchers.eq(12312313L));
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfTriggerTasksNotExecuted() throws Exception {
        JobID jobID = new JobID();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, new ExecutionVertex[]{(ExecutionVertex) Mockito.mock(ExecutionVertex.class), (ExecutionVertex) Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[]{mockExecutionVertex(jobID), mockExecutionVertex(jobID)}, new ExecutionVertex[0], new MockCheckpointIdCounter(), new HeapSavepointStore());
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1238123L);
        Assert.assertTrue(triggerSavepoint.isCompleted());
        try {
            Await.result(triggerSavepoint, FiniteDuration.Zero());
            Assert.fail("Did not throw expected Exception after shutdown");
        } catch (Exception e) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfTriggerTasksAreFinished() throws Exception {
        JobID jobID = new JobID();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, new ExecutionVertex[]{mockExecutionVertex(jobID), mockExecutionVertex(jobID, ExecutionState.FINISHED)}, new ExecutionVertex[]{mockExecutionVertex(jobID), mockExecutionVertex(jobID)}, new ExecutionVertex[0], new MockCheckpointIdCounter(), new HeapSavepointStore());
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1238123L);
        Assert.assertTrue(triggerSavepoint.isCompleted());
        try {
            Await.result(triggerSavepoint, FiniteDuration.Zero());
            Assert.fail("Did not throw expected Exception after shutdown");
        } catch (Exception e) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfAckTasksAreNotExecuted() throws Exception {
        JobID jobID = new JobID();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, new ExecutionVertex[]{mockExecutionVertex(jobID), mockExecutionVertex(jobID)}, new ExecutionVertex[]{(ExecutionVertex) Mockito.mock(ExecutionVertex.class), (ExecutionVertex) Mockito.mock(ExecutionVertex.class)}, new ExecutionVertex[0], new MockCheckpointIdCounter(), new HeapSavepointStore());
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1238123L);
        Assert.assertTrue(triggerSavepoint.isCompleted());
        try {
            Await.result(triggerSavepoint, FiniteDuration.Zero());
            Assert.fail("Did not throw expected Exception after shutdown");
        } catch (Exception e) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testAbortOnCheckpointTimeout() throws Exception {
        JobID jobID = new JobID();
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(jobID), mockExecutionVertex(jobID)};
        ExecutionVertex mockExecutionVertex = mockExecutionVertex(jobID);
        MockCheckpointIdCounter mockCheckpointIdCounter = new MockCheckpointIdCounter();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 1000L, executionVertexArr, executionVertexArr, new ExecutionVertex[]{mockExecutionVertex}, mockCheckpointIdCounter, new HeapSavepointStore());
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(12731273L);
        Assert.assertFalse(triggerSavepoint.isCompleted());
        PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) createSavepointCoordinator.getPendingCheckpoints().get(Long.valueOf(mockCheckpointIdCounter.getLastReturnedCount()));
        Assert.assertNotNull("Checkpoint not pending (test race)", pendingCheckpoint);
        Assert.assertFalse("Checkpoint already discarded (test race)", pendingCheckpoint.isDiscarded());
        Deadline fromNow = FiniteDuration.apply(60L, "s").fromNow();
        while (fromNow.hasTimeLeft() && !pendingCheckpoint.isDiscarded() && createSavepointCoordinator.getNumberOfPendingCheckpoints() > 0) {
            Thread.sleep(250L);
        }
        Assert.assertTrue("Savepoint not discarded within timeout", pendingCheckpoint.isDiscarded());
        Assert.assertEquals(0L, createSavepointCoordinator.getNumberOfPendingCheckpoints());
        Assert.assertEquals(0L, createSavepointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());
        ((ExecutionVertex) Mockito.verify(mockExecutionVertex, Mockito.times(0))).sendMessageToCurrentExecution((Serializable) Matchers.any(NotifyCheckpointComplete.class), (ExecutionAttemptID) Matchers.any(ExecutionAttemptID.class));
        Assert.assertTrue(triggerSavepoint.isCompleted());
        try {
            Await.result(triggerSavepoint, FiniteDuration.Zero());
            Assert.fail("Did not throw expected Exception after timeout");
        } catch (Exception e) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testAbortSavepointsOnShutdown() throws Exception {
        JobID jobID = new JobID();
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(jobID), mockExecutionVertex(jobID)};
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, executionVertexArr, executionVertexArr, executionVertexArr, new MockCheckpointIdCounter(), new HeapSavepointStore());
        ArrayList<Future> arrayList = new ArrayList();
        arrayList.add(createSavepointCoordinator.triggerSavepoint(12731273L));
        arrayList.add(createSavepointCoordinator.triggerSavepoint(12731396L));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((Future) it.next()).isCompleted());
        }
        createSavepointCoordinator.shutdown();
        for (Future future : arrayList) {
            Assert.assertTrue(future.isCompleted());
            try {
                Await.result(future, FiniteDuration.Zero());
                Assert.fail("Did not throw expected Exception after shutdown");
            } catch (Exception e) {
            }
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
    }

    @Test
    public void testAbortSavepointOnStateStoreFailure() throws Exception {
        JobID jobID = new JobID();
        ExecutionJobVertex mockExecutionJobVertex = mockExecutionJobVertex(jobID, new JobVertexID(), 4);
        HeapSavepointStore heapSavepointStore = (HeapSavepointStore) Mockito.spy(new HeapSavepointStore());
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, mockExecutionJobVertex.getTaskVertices(), mockExecutionJobVertex.getTaskVertices(), new ExecutionVertex[0], new MockCheckpointIdCounter(), heapSavepointStore);
        ((HeapSavepointStore) Mockito.doThrow(new RuntimeException("TestException")).when(heapSavepointStore)).storeSavepoint((Savepoint) Matchers.any(Savepoint.class));
        Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1231273123L);
        for (ExecutionVertex executionVertex : mockExecutionJobVertex.getTaskVertices()) {
            createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertex.getCurrentExecutionAttempt().getAttemptId(), 0L, createSerializedStateHandle(executionVertex), 0L));
        }
        try {
            Await.result(triggerSavepoint, FiniteDuration.Zero());
            Assert.fail("Did not throw expected Exception after rollback with savepoint store failure.");
        } catch (Exception e) {
        }
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testAbortSavepointIfSubsumed() throws Exception {
        JobID jobID = new JobID();
        long[] jArr = {1272635, 1272645};
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(jobID), mockExecutionVertex(jobID)};
        MockCheckpointIdCounter mockCheckpointIdCounter = new MockCheckpointIdCounter();
        HeapSavepointStore heapSavepointStore = new HeapSavepointStore();
        SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, executionVertexArr, executionVertexArr, executionVertexArr, mockCheckpointIdCounter, heapSavepointStore);
        ArrayList arrayList = new ArrayList();
        arrayList.add(createSavepointCoordinator.triggerSavepoint(jArr[0]));
        arrayList.add(createSavepointCoordinator.triggerSavepoint(jArr[1]));
        long[] jArr2 = {mockCheckpointIdCounter.getLastReturnedCount(), mockCheckpointIdCounter.getLastReturnedCount()};
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((Future) it.next()).isCompleted());
        }
        for (ExecutionVertex executionVertex : executionVertexArr) {
            verifyTriggerCheckpoint(executionVertex, jArr2[0], jArr[0]);
            verifyTriggerCheckpoint(executionVertex, jArr2[1], jArr[1]);
        }
        PendingCheckpoint[] pendingCheckpointArr = {(PendingCheckpoint) createSavepointCoordinator.getPendingCheckpoints().get(Long.valueOf(jArr2[0])), (PendingCheckpoint) createSavepointCoordinator.getPendingCheckpoints().get(Long.valueOf(jArr2[1]))};
        verifyPendingCheckpoint(pendingCheckpointArr[0], jobID, jArr2[0], jArr[0], 0, 2, 0, false, false);
        verifyPendingCheckpoint(pendingCheckpointArr[1], jobID, jArr2[1], jArr[1], 0, 2, 0, false, false);
        for (ExecutionVertex executionVertex2 : executionVertexArr) {
            createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertex2.getCurrentExecutionAttempt().getAttemptId(), jArr2[1], createSerializedStateHandle(executionVertex2), 0L));
        }
        createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertexArr[0].getCurrentExecutionAttempt().getAttemptId(), jArr2[0], createSerializedStateHandle(executionVertexArr[0]), 0L));
        Assert.assertTrue(pendingCheckpointArr[0].isDiscarded());
        Assert.assertTrue(pendingCheckpointArr[1].isDiscarded());
        Assert.assertEquals(0L, createSavepointCoordinator.getSuccessfulCheckpoints().size());
        for (ExecutionVertex executionVertex3 : executionVertexArr) {
            verifyNotifyCheckpointComplete(executionVertex3, jArr2[1], jArr[1]);
        }
        Savepoint[] savepointArr = new Savepoint[2];
        String[] strArr = new String[2];
        Assert.assertTrue(((Future) arrayList.get(0)).isCompleted());
        try {
            strArr[0] = (String) Await.result((Awaitable) arrayList.get(0), FiniteDuration.Zero());
            Assert.fail("Did not throw expected exception");
        } catch (Exception e) {
        }
        Assert.assertTrue(((Future) arrayList.get(1)).isCompleted());
        strArr[1] = (String) Await.result((Awaitable) arrayList.get(1), FiniteDuration.Zero());
        savepointArr[1] = heapSavepointStore.loadSavepoint(strArr[1]);
        verifySavepoint(savepointArr[1], jArr2[1], executionVertexArr);
        Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
        createSavepointCoordinator.shutdown();
    }

    @Test
    public void testShutdownDoesNotCleanUpCompletedCheckpointsWithFileSystemStore() throws Exception {
        JobID jobID = new JobID();
        ExecutionVertex[] executionVertexArr = {mockExecutionVertex(jobID), mockExecutionVertex(jobID)};
        MockCheckpointIdCounter mockCheckpointIdCounter = new MockCheckpointIdCounter();
        File createTempDirectory = CommonTestUtils.createTempDirectory();
        try {
            FsSavepointStore fsSavepointStore = new FsSavepointStore(createTempDirectory.toURI().toString(), "sp-");
            SavepointCoordinator createSavepointCoordinator = createSavepointCoordinator(jobID, 60000L, executionVertexArr, executionVertexArr, executionVertexArr, mockCheckpointIdCounter, fsSavepointStore);
            Future triggerSavepoint = createSavepointCoordinator.triggerSavepoint(1272635L);
            Assert.assertFalse(triggerSavepoint.isCompleted());
            long lastReturnedCount = mockCheckpointIdCounter.getLastReturnedCount();
            Assert.assertEquals(0L, lastReturnedCount);
            for (ExecutionVertex executionVertex : executionVertexArr) {
                verifyTriggerCheckpoint(executionVertex, lastReturnedCount, 1272635L);
            }
            PendingCheckpoint pendingCheckpoint = (PendingCheckpoint) createSavepointCoordinator.getPendingCheckpoints().get(Long.valueOf(lastReturnedCount));
            verifyPendingCheckpoint(pendingCheckpoint, jobID, lastReturnedCount, 1272635L, 0, 2, 0, false, false);
            for (ExecutionVertex executionVertex2 : executionVertexArr) {
                createSavepointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jobID, executionVertex2.getCurrentExecutionAttempt().getAttemptId(), lastReturnedCount, createSerializedStateHandle(executionVertex2), 0L));
            }
            Assert.assertTrue(pendingCheckpoint.isDiscarded());
            Assert.assertEquals(0L, createSavepointCoordinator.getSuccessfulCheckpoints().size());
            for (ExecutionVertex executionVertex3 : executionVertexArr) {
                verifyNotifyCheckpointComplete(executionVertex3, lastReturnedCount, 1272635L);
            }
            Assert.assertTrue(triggerSavepoint.isCompleted());
            String str = (String) Await.result(triggerSavepoint, FiniteDuration.Zero());
            Assert.assertEquals(0L, getSavepointPromises(createSavepointCoordinator).size());
            createSavepointCoordinator.shutdown();
            verifySavepoint(fsSavepointStore.loadSavepoint(str), lastReturnedCount, executionVertexArr);
            FileUtils.deleteDirectory(createTempDirectory);
        } catch (Throwable th) {
            FileUtils.deleteDirectory(createTempDirectory);
            throw th;
        }
    }

    private static SavepointCoordinator createSavepointCoordinator(JobID jobID, long j, ExecutionVertex[] executionVertexArr, ExecutionVertex[] executionVertexArr2, ExecutionVertex[] executionVertexArr3, CheckpointIDCounter checkpointIDCounter, SavepointStore savepointStore) throws Exception {
        return new SavepointCoordinator(jobID, j, j, 42, executionVertexArr, executionVertexArr2, executionVertexArr3, Thread.currentThread().getContextClassLoader(), checkpointIDCounter, savepointStore, new DisabledCheckpointStatsTracker());
    }

    private static Map<JobVertexID, ExecutionJobVertex> createExecutionJobVertexMap(ExecutionJobVertex... executionJobVertexArr) {
        HashMap hashMap = new HashMap();
        for (ExecutionJobVertex executionJobVertex : executionJobVertexArr) {
            hashMap.put(executionJobVertex.getJobVertexId(), executionJobVertex);
        }
        return hashMap;
    }

    private static SerializedValue<StateHandle<?>> createSerializedStateHandle(ExecutionVertex executionVertex) throws IOException {
        return new SerializedValue<>(new LocalStateHandle(executionVertex.getCurrentExecutionAttempt().getAttemptId()));
    }

    private Map<Long, Promise<String>> getSavepointPromises(SavepointCoordinator savepointCoordinator) throws NoSuchFieldException, IllegalAccessException {
        Field declaredField = SavepointCoordinator.class.getDeclaredField("savepointPromises");
        declaredField.setAccessible(true);
        return (Map) declaredField.get(savepointCoordinator);
    }

    private static void verifyTriggerCheckpoint(ExecutionVertex executionVertex, long j, long j2) {
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ((ExecutionVertex) Mockito.verify(executionVertex)).sendMessageToCurrentExecution((Serializable) Matchers.eq(new TriggerCheckpoint(executionVertex.getJobId(), attemptId, j, j2)), (ExecutionAttemptID) Matchers.eq(attemptId));
    }

    private static void verifyNotifyCheckpointComplete(ExecutionVertex executionVertex, long j, long j2) {
        ExecutionAttemptID attemptId = executionVertex.getCurrentExecutionAttempt().getAttemptId();
        ((ExecutionVertex) Mockito.verify(executionVertex)).sendMessageToCurrentExecution((Serializable) Matchers.eq(new NotifyCheckpointComplete(executionVertex.getJobId(), attemptId, j, j2)), (ExecutionAttemptID) Matchers.eq(attemptId));
    }

    private static void verifyPendingCheckpoint(PendingCheckpoint pendingCheckpoint, JobID jobID, long j, long j2, int i, int i2, int i3, boolean z, boolean z2) {
        Assert.assertNotNull(pendingCheckpoint);
        Assert.assertEquals(jobID, pendingCheckpoint.getJobId());
        Assert.assertEquals(j, pendingCheckpoint.getCheckpointId());
        Assert.assertEquals(j2, pendingCheckpoint.getCheckpointTimestamp());
        Assert.assertEquals(i, pendingCheckpoint.getNumberOfAcknowledgedTasks());
        Assert.assertEquals(i2, pendingCheckpoint.getNumberOfNonAcknowledgedTasks());
        int i4 = 0;
        Iterator it = pendingCheckpoint.getTaskStates().values().iterator();
        while (it.hasNext()) {
            i4 += ((TaskState) it.next()).getNumberCollectedStates();
        }
        Assert.assertEquals(i3, i4);
        Assert.assertEquals(Boolean.valueOf(z), Boolean.valueOf(pendingCheckpoint.isDiscarded()));
        Assert.assertEquals(Boolean.valueOf(z2), Boolean.valueOf(pendingCheckpoint.isFullyAcknowledged()));
    }

    private static void verifySavepoint(Savepoint savepoint, long j, ExecutionVertex[] executionVertexArr) throws Exception {
        Assert.assertEquals(j, savepoint.getCheckpointId());
        for (TaskState taskState : savepoint.getTaskStates()) {
            JobVertexID jobVertexID = taskState.getJobVertexID();
            ExecutionVertex executionVertex = null;
            int length = executionVertexArr.length;
            int i = 0;
            while (true) {
                if (i >= length) {
                    break;
                }
                ExecutionVertex executionVertex2 = executionVertexArr[i];
                if (executionVertex2.getJobvertexId().equals(jobVertexID)) {
                    executionVertex = executionVertex2;
                    break;
                }
                i++;
            }
            if (executionVertex == null) {
                Assert.fail("Did not find matching vertex");
            } else {
                Assert.assertEquals(executionVertex.getCurrentExecutionAttempt().getAttemptId(), (ExecutionAttemptID) ((StateHandle) taskState.getState(executionVertex.getParallelSubtaskIndex()).getState().deserializeValue(Thread.currentThread().getContextClassLoader())).getState(Thread.currentThread().getContextClassLoader()));
            }
        }
    }

    private static ExecutionJobVertex mockExecutionJobVertex(JobID jobID, JobVertexID jobVertexID, int i) {
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) Mockito.mock(ExecutionJobVertex.class);
        Mockito.when(executionJobVertex.getJobId()).thenReturn(jobID);
        Mockito.when(executionJobVertex.getJobVertexId()).thenReturn(jobVertexID);
        Mockito.when(Integer.valueOf(executionJobVertex.getParallelism())).thenReturn(Integer.valueOf(i));
        ExecutionVertex[] executionVertexArr = new ExecutionVertex[i];
        for (int i2 = 0; i2 < executionVertexArr.length; i2++) {
            executionVertexArr[i2] = mockExecutionVertex(jobID, jobVertexID, i2, i, ExecutionState.RUNNING);
        }
        Mockito.when(executionJobVertex.getTaskVertices()).thenReturn(executionVertexArr);
        return executionJobVertex;
    }

    private static ExecutionVertex mockExecutionVertex(JobID jobID) {
        return mockExecutionVertex(jobID, ExecutionState.RUNNING);
    }

    private static ExecutionVertex mockExecutionVertex(JobID jobID, ExecutionState executionState) {
        return mockExecutionVertex(jobID, new JobVertexID(), 0, 1, executionState);
    }

    private static ExecutionVertex mockExecutionVertex(JobID jobID, JobVertexID jobVertexID, int i, int i2, ExecutionState executionState) {
        Execution execution = (Execution) Mockito.mock(Execution.class);
        Mockito.when(execution.getAttemptId()).thenReturn(new ExecutionAttemptID());
        Mockito.when(execution.getState()).thenReturn(executionState);
        ExecutionVertex executionVertex = (ExecutionVertex) Mockito.mock(ExecutionVertex.class);
        Mockito.when(executionVertex.getJobId()).thenReturn(jobID);
        Mockito.when(executionVertex.getJobvertexId()).thenReturn(jobVertexID);
        Mockito.when(Integer.valueOf(executionVertex.getParallelSubtaskIndex())).thenReturn(Integer.valueOf(i));
        Mockito.when(executionVertex.getCurrentExecutionAttempt()).thenReturn(execution);
        Mockito.when(Integer.valueOf(executionVertex.getTotalNumberOfParallelSubtasks())).thenReturn(Integer.valueOf(i2));
        return executionVertex;
    }
}
