package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.ManuallyTriggeredDirectExecutor;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointStatsTracker;
import org.apache.flink.runtime.checkpoint.PendingCheckpoint;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointIDCounter;
import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverStrategy;
import org.apache.flink.runtime.executiongraph.failover.RestartIndividualStrategy;
import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.SlotProvider;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.class */
public class IndividualRestartsConcurrencyTest extends TestLogger {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest$IndividualFailoverWithCustomExecutor.class */
    private static class IndividualFailoverWithCustomExecutor implements FailoverStrategy.Factory {
        private final Executor executor;

        IndividualFailoverWithCustomExecutor(Executor executor) {
            this.executor = executor;
        }

        public FailoverStrategy create(ExecutionGraph executionGraph) {
            return new RestartIndividualStrategy(executionGraph, this.executor);
        }
    }

    @Test
    public void testCancelWhileInLocalFailover() throws Exception {
        JobID jobID = new JobID();
        ManuallyTriggeredDirectExecutor manuallyTriggeredDirectExecutor = new ManuallyTriggeredDirectExecutor();
        ExecutionGraph createSampleGraph = createSampleGraph(jobID, new IndividualFailoverWithCustomExecutor(manuallyTriggeredDirectExecutor), new SimpleSlotProvider(jobID, 2), 2);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSampleGraph.getVerticesTopologically().iterator().next();
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        createSampleGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(1L, manuallyTriggeredDirectExecutor.numQueuedRunnables());
        createSampleGraph.cancel();
        Assert.assertEquals(JobStatus.CANCELLING, createSampleGraph.getState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.CANCELING, executionVertex2.getCurrentExecutionAttempt().getState());
        manuallyTriggeredDirectExecutor.trigger();
        executionVertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.CANCELED, createSampleGraph.getState());
        Assert.assertTrue(executionVertex.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue(executionVertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals(2L, r0.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalFailureConcurrentToLocalFailover() throws Exception {
        JobID jobID = new JobID();
        ManuallyTriggeredDirectExecutor manuallyTriggeredDirectExecutor = new ManuallyTriggeredDirectExecutor();
        ExecutionGraph createSampleGraph = createSampleGraph(jobID, new IndividualFailoverWithCustomExecutor(manuallyTriggeredDirectExecutor), new SimpleSlotProvider(jobID, 2), 2);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSampleGraph.getVerticesTopologically().iterator().next();
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        createSampleGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(1L, manuallyTriggeredDirectExecutor.numQueuedRunnables());
        createSampleGraph.failGlobal(new Exception("test exception"));
        Assert.assertEquals(JobStatus.FAILING, createSampleGraph.getState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.CANCELING, executionVertex2.getCurrentExecutionAttempt().getState());
        manuallyTriggeredDirectExecutor.trigger();
        executionVertex2.getCurrentExecutionAttempt().cancelingComplete();
        Assert.assertEquals(JobStatus.FAILED, createSampleGraph.getState());
        Assert.assertTrue(executionVertex.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertTrue(executionVertex2.getCurrentExecutionAttempt().getState().isTerminal());
        Assert.assertEquals(2L, r0.getNumberOfAvailableSlots());
    }

    @Test
    public void testGlobalRecoveryConcurrentToLocalRecovery() throws Exception {
        JobID jobID = new JobID();
        ManuallyTriggeredDirectExecutor manuallyTriggeredDirectExecutor = new ManuallyTriggeredDirectExecutor();
        ExecutionGraph createSampleGraph = createSampleGraph(jobID, new IndividualFailoverWithCustomExecutor(manuallyTriggeredDirectExecutor), new FixedDelayRestartStrategy(1, 0L), new SimpleSlotProvider(jobID, 2), 2);
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSampleGraph.getVerticesTopologically().iterator().next();
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        createSampleGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        executionVertex2.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(1L, manuallyTriggeredDirectExecutor.numQueuedRunnables());
        createSampleGraph.failGlobal(new Exception("test exception"));
        Assert.assertEquals(JobStatus.FAILING, createSampleGraph.getState());
        Assert.assertEquals(ExecutionState.FAILED, executionVertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.CANCELING, executionVertex.getCurrentExecutionAttempt().getState());
        executionVertex.getCurrentExecutionAttempt().cancelingComplete();
        ExecutionGraphTestUtils.waitUntilJobStatus(createSampleGraph, JobStatus.RUNNING, 1000L);
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        ExecutionGraphTestUtils.waitUntilExecutionState(executionVertex2.getCurrentExecutionAttempt(), ExecutionState.DEPLOYING, 1000L);
        executionVertex.getCurrentExecutionAttempt().switchToRunning();
        executionVertex2.getCurrentExecutionAttempt().switchToRunning();
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex2.getCurrentExecutionAttempt().getState());
        manuallyTriggeredDirectExecutor.trigger();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(ExecutionState.RUNNING, executionVertex2.getCurrentExecutionAttempt().getState());
        Assert.assertEquals(1L, executionVertex.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex2.getCurrentExecutionAttempt().getAttemptNumber());
        Assert.assertEquals(1L, executionVertex.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals(1L, executionVertex2.getCopyOfPriorExecutionsList().size());
        Assert.assertEquals(0L, r0.getNumberOfAvailableSlots());
    }

    @Test
    public void testLocalFailureFailsPendingCheckpoints() throws Exception {
        JobID jobID = new JobID();
        TaskManagerGateway taskManagerGateway = (TaskManagerGateway) Mockito.mock(TaskManagerGateway.class);
        Mockito.when(taskManagerGateway.submitTask((TaskDeploymentDescriptor) Matchers.any(TaskDeploymentDescriptor.class), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        Mockito.when(taskManagerGateway.cancelTask((ExecutionAttemptID) Matchers.any(ExecutionAttemptID.class), (Time) Matchers.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
        SimpleSlotProvider simpleSlotProvider = new SimpleSlotProvider(jobID, 2, taskManagerGateway);
        ScheduledExecutorService defaultExecutor = TestingUtils.defaultExecutor();
        CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = new CheckpointCoordinatorConfiguration(10L, 100000L, 1L, 3, ExternalizedCheckpointSettings.none(), true);
        ExecutionGraph createSampleGraph = createSampleGraph(jobID, new IndividualFailoverWithCustomExecutor(defaultExecutor), simpleSlotProvider, 2);
        ArrayList arrayList = new ArrayList(createSampleGraph.getAllVertices().values());
        StandaloneCheckpointIDCounter standaloneCheckpointIDCounter = new StandaloneCheckpointIDCounter();
        createSampleGraph.enableCheckpointing(checkpointCoordinatorConfiguration.getCheckpointInterval(), checkpointCoordinatorConfiguration.getCheckpointTimeout(), checkpointCoordinatorConfiguration.getMinPauseBetweenCheckpoints(), checkpointCoordinatorConfiguration.getMaxConcurrentCheckpoints(), checkpointCoordinatorConfiguration.getExternalizedCheckpointSettings(), arrayList, arrayList, arrayList, Collections.emptyList(), standaloneCheckpointIDCounter, new StandaloneCompletedCheckpointStore(1), "", new MemoryStateBackend(), new CheckpointStatsTracker(1, arrayList, checkpointCoordinatorConfiguration, new UnregisteredTaskMetricsGroup()));
        CheckpointCoordinator checkpointCoordinator = createSampleGraph.getCheckpointCoordinator();
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) createSampleGraph.getVerticesTopologically().iterator().next();
        ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0];
        ExecutionVertex executionVertex2 = executionJobVertex.getTaskVertices()[1];
        createSampleGraph.scheduleForExecution();
        Assert.assertEquals(JobStatus.RUNNING, createSampleGraph.getState());
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.timeout(5000L).times(2))).submitTask((TaskDeploymentDescriptor) Matchers.any(TaskDeploymentDescriptor.class), (Time) Matchers.any(Time.class));
        Iterator it = createSampleGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().switchToRunning();
        }
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.timeout(5000L).times(3))).triggerCheckpoint((ExecutionAttemptID) Matchers.eq(executionVertex.getCurrentExecutionAttempt().getAttemptId()), (JobID) Matchers.any(JobID.class), Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
        ((TaskManagerGateway) Mockito.verify(taskManagerGateway, Mockito.timeout(5000L).times(3))).triggerCheckpoint((ExecutionAttemptID) Matchers.eq(executionVertex2.getCurrentExecutionAttempt().getAttemptId()), (JobID) Matchers.any(JobID.class), Matchers.anyLong(), Matchers.anyLong(), (CheckpointOptions) Matchers.any(CheckpointOptions.class));
        Assert.assertEquals(3L, checkpointCoordinator.getNumberOfPendingCheckpoints());
        long last = standaloneCheckpointIDCounter.getLast();
        checkpointCoordinator.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(createSampleGraph.getJobID(), executionVertex.getCurrentExecutionAttempt().getAttemptId(), last));
        HashMap hashMap = new HashMap(3);
        for (PendingCheckpoint pendingCheckpoint : checkpointCoordinator.getPendingCheckpoints().values()) {
            Assert.assertFalse(pendingCheckpoint.isDiscarded());
            hashMap.put(Long.valueOf(pendingCheckpoint.getCheckpointId()), pendingCheckpoint);
        }
        executionVertex.getCurrentExecutionAttempt().fail(new Exception("test failure"));
        for (PendingCheckpoint pendingCheckpoint2 : hashMap.values()) {
            if (pendingCheckpoint2.getCheckpointId() == last) {
                Assert.assertFalse(pendingCheckpoint2.isDiscarded());
            } else {
                Assert.assertTrue(pendingCheckpoint2.isDiscarded());
            }
        }
    }

    private ExecutionGraph createSampleGraph(JobID jobID, FailoverStrategy.Factory factory, SlotProvider slotProvider, int i) throws Exception {
        return createSampleGraph(jobID, factory, new NoRestartStrategy(), slotProvider, i);
    }

    private ExecutionGraph createSampleGraph(JobID jobID, FailoverStrategy.Factory factory, RestartStrategy restartStrategy, SlotProvider slotProvider, int i) throws Exception {
        ExecutionGraph executionGraph = new ExecutionGraph(new DummyJobInformation(jobID, "test job"), TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), Time.seconds(10L), restartStrategy, factory, slotProvider);
        JobVertex jobVertex = new JobVertex("test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        executionGraph.attachJobGraph(new JobGraph(jobID, "testjob", new JobVertex[]{jobVertex}).getVerticesSortedTopologicallyFromSources());
        return executionGraph;
    }
}
