package org.apache.flink.runtime.executiongraph;

import java.util.function.Predicate;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.util.FlinkException;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest.class */
public class ExecutionGraphCoLocationRestartTest extends SchedulerTestBase {
    private static final int NUM_TASKS = 31;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphCoLocationRestartTest$OneTimeDirectRestartStrategy.class */
    private static final class OneTimeDirectRestartStrategy implements RestartStrategy {
        private boolean hasRestarted;

        private OneTimeDirectRestartStrategy() {
            this.hasRestarted = false;
        }

        public boolean canRestart() {
            return !this.hasRestarted;
        }

        public void restart(RestartCallback restartCallback, ScheduledExecutor scheduledExecutor) {
            this.hasRestarted = true;
            restartCallback.triggerFullRecovery();
        }
    }

    @Test
    public void testConstraintsAfterRestart() throws Exception {
        this.testingSlotProvider.addTaskManager(NUM_TASKS);
        JobVertex createNoOpVertex = ExecutionGraphTestUtils.createNoOpVertex(NUM_TASKS);
        JobVertex createNoOpVertex2 = ExecutionGraphTestUtils.createNoOpVertex(NUM_TASKS);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createNoOpVertex.setSlotSharingGroup(slotSharingGroup);
        createNoOpVertex2.setSlotSharingGroup(slotSharingGroup);
        createNoOpVertex.setStrictlyCoLocatedWith(createNoOpVertex2);
        ExecutionGraph createSimpleTestGraph = ExecutionGraphTestUtils.createSimpleTestGraph(new JobID(), this.testingSlotProvider, new OneTimeDirectRestartStrategy(), createNoOpVertex, createNoOpVertex2);
        createSimpleTestGraph.setQueuedSchedulingAllowed(true);
        Assert.assertEquals(JobStatus.CREATED, createSimpleTestGraph.getState());
        createSimpleTestGraph.scheduleForExecution();
        Predicate<Execution> isInExecutionState = ExecutionGraphTestUtils.isInExecutionState(ExecutionState.DEPLOYING);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(createSimpleTestGraph, isInExecutionState, 5000L);
        Assert.assertEquals(JobStatus.RUNNING, createSimpleTestGraph.getState());
        validateConstraints(createSimpleTestGraph);
        ExecutionGraphTestUtils.failExecutionGraph(createSimpleTestGraph, new FlinkException("Test exception"));
        ExecutionGraphTestUtils.waitUntilJobStatus(createSimpleTestGraph, JobStatus.RUNNING, 5000L);
        ExecutionGraphTestUtils.waitForAllExecutionsPredicate(createSimpleTestGraph, isInExecutionState, 5000L);
        validateConstraints(createSimpleTestGraph);
        ExecutionGraphTestUtils.finishAllVertices(createSimpleTestGraph);
        Assert.assertThat(createSimpleTestGraph.getState(), Matchers.is(JobStatus.FINISHED));
    }

    private void validateConstraints(ExecutionGraph executionGraph) {
        ExecutionJobVertex[] executionJobVertexArr = (ExecutionJobVertex[]) executionGraph.getAllVertices().values().toArray(new ExecutionJobVertex[2]);
        for (int i = 0; i < NUM_TASKS; i++) {
            CoLocationConstraint locationConstraint = executionJobVertexArr[0].getTaskVertices()[i].getLocationConstraint();
            CoLocationConstraint locationConstraint2 = executionJobVertexArr[1].getTaskVertices()[i].getLocationConstraint();
            Assert.assertThat(Boolean.valueOf(locationConstraint.isAssigned()), Matchers.is(true));
            Assert.assertThat(locationConstraint.getLocation(), Matchers.equalTo(locationConstraint2.getLocation()));
        }
    }
}
