package org.apache.flink.runtime.executiongraph;

import akka.actor.Status;
import java.lang.reflect.Field;
import java.net.InetAddress;
import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.PermanentBlobService;
import org.apache.flink.runtime.blob.VoidBlobWriter;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.failover.FailoverRegion;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
import org.apache.flink.runtime.instance.BaseTestingActorGateway;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.instance.SimpleSlotContext;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.SlotOwner;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.TaskMessages;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.class */
public class ExecutionGraphTestUtils {
    private static final Logger TEST_LOGGER;
    static final Predicate<Execution> hasResourceAssigned;
    public static final String ERROR_MESSAGE = "test_failure_error_message";
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils$SimpleActorGateway.class */
    public static class SimpleActorGateway extends BaseTestingActorGateway {
        public SimpleActorGateway(ExecutionContext executionContext) {
            super(executionContext);
        }

        @Override // org.apache.flink.runtime.instance.BaseTestingActorGateway
        public Object handleMessage(Object obj) {
            if (obj instanceof TaskMessages.SubmitTask) {
                return Acknowledge.get();
            }
            if (obj instanceof TaskMessages.CancelTask) {
                return Acknowledge.get();
            }
            if (obj instanceof TaskMessages.FailIntermediateResultPartitions) {
                return new Object();
            }
            return null;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils$SimpleActorGatewayWithTDD.class */
    public static class SimpleActorGatewayWithTDD extends SimpleActorGateway {
        public TaskDeploymentDescriptor lastTDD;
        private final PermanentBlobService blobCache;

        public SimpleActorGatewayWithTDD(ExecutionContext executionContext, PermanentBlobService permanentBlobService) {
            super(executionContext);
            this.blobCache = permanentBlobService;
        }

        @Override // org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.SimpleActorGateway, org.apache.flink.runtime.instance.BaseTestingActorGateway
        public Object handleMessage(Object obj) {
            if (!(obj instanceof TaskMessages.SubmitTask)) {
                return super.handleMessage(obj);
            }
            this.lastTDD = ((TaskMessages.SubmitTask) obj).tasks();
            try {
                this.lastTDD.loadBigData(this.blobCache);
                return Acknowledge.get();
            } catch (Exception e) {
                e.printStackTrace();
                return new Status.Failure(e);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils$SimpleFailingActorGateway.class */
    public static class SimpleFailingActorGateway extends BaseTestingActorGateway {
        public SimpleFailingActorGateway(ExecutionContext executionContext) {
            super(executionContext);
        }

        @Override // org.apache.flink.runtime.instance.BaseTestingActorGateway
        public Object handleMessage(Object obj) throws Exception {
            if (obj instanceof TaskMessages.SubmitTask) {
                throw new Exception(ExecutionGraphTestUtils.ERROR_MESSAGE);
            }
            if (!(obj instanceof TaskMessages.CancelTask)) {
                return null;
            }
            return Acknowledge.get();
        }
    }

    public static void waitUntilJobStatus(ExecutionGraph executionGraph, JobStatus jobStatus, long j) throws TimeoutException {
        Preconditions.checkNotNull(executionGraph);
        Preconditions.checkNotNull(jobStatus);
        Preconditions.checkArgument(j >= 0);
        long nanoTime = j == 0 ? Long.MAX_VALUE : System.nanoTime() + (j * 1000000);
        while (executionGraph.getState() != jobStatus && System.nanoTime() < nanoTime) {
            try {
                Thread.sleep(2L);
            } catch (InterruptedException e) {
            }
        }
        if (System.nanoTime() >= nanoTime) {
            throw new TimeoutException(String.format("The job did not reach status %s in time. Current status is %s.", jobStatus, executionGraph.getState()));
        }
    }

    public static void waitUntilExecutionState(Execution execution, ExecutionState executionState, long j) throws TimeoutException {
        Preconditions.checkNotNull(execution);
        Preconditions.checkNotNull(executionState);
        Preconditions.checkArgument(j >= 0);
        long nanoTime = j == 0 ? Long.MAX_VALUE : System.nanoTime() + (j * 1000000);
        while (execution.getState() != executionState && System.nanoTime() < nanoTime) {
            try {
                Thread.sleep(2L);
            } catch (InterruptedException e) {
            }
        }
        if (System.nanoTime() >= nanoTime) {
            throw new TimeoutException(String.format("The execution did not reach state %s in time. Current state is %s.", executionState, execution.getState()));
        }
    }

    public static void waitUntilExecutionVertexState(ExecutionVertex executionVertex, ExecutionState executionState, long j) throws TimeoutException {
        Execution currentExecutionAttempt;
        Preconditions.checkNotNull(executionVertex);
        Preconditions.checkNotNull(executionState);
        Preconditions.checkArgument(j >= 0);
        long nanoTime = j == 0 ? Long.MAX_VALUE : System.nanoTime() + (j * 1000000);
        do {
            currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
            if (currentExecutionAttempt != null && (currentExecutionAttempt.getState() == executionState || System.nanoTime() >= nanoTime)) {
                return;
            } else {
                try {
                    Thread.sleep(2L);
                } catch (InterruptedException e) {
                }
            }
        } while (System.nanoTime() < nanoTime);
        if (currentExecutionAttempt == null) {
            throw new TimeoutException("Cannot get current execution attempt of " + executionVertex + '.');
        }
        throw new TimeoutException(String.format("The execution vertex did not reach state %s in time. Current state is %s.", executionState, currentExecutionAttempt.getState()));
    }

    public static void waitForAllExecutionsPredicate(ExecutionGraph executionGraph, Predicate<Execution> predicate, long j) throws TimeoutException {
        boolean z;
        Iterable allExecutionVertices = executionGraph.getAllExecutionVertices();
        Deadline fromNow = Deadline.fromNow(Duration.ofMillis(j));
        do {
            z = true;
            Iterator it = allExecutionVertices.iterator();
            while (it.hasNext()) {
                Execution currentExecutionAttempt = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt();
                if (currentExecutionAttempt == null || !predicate.test(currentExecutionAttempt)) {
                    z = false;
                    break;
                }
            }
            if (!z) {
                try {
                    Thread.sleep(2L);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            if (z) {
                break;
            }
        } while (fromNow.hasTimeLeft());
        if (!z) {
            throw new TimeoutException("Not all executions fulfilled the predicate in time.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Predicate<Execution> isInExecutionState(ExecutionState executionState) {
        return execution -> {
            return execution.getState() == executionState;
        };
    }

    public static void waitUntilFailoverRegionState(FailoverRegion failoverRegion, JobStatus jobStatus, long j) throws TimeoutException {
        Preconditions.checkNotNull(failoverRegion);
        Preconditions.checkNotNull(jobStatus);
        Preconditions.checkArgument(j >= 0);
        long nanoTime = j == 0 ? Long.MAX_VALUE : System.nanoTime() + (j * 1000000);
        while (failoverRegion.getState() != jobStatus && System.nanoTime() < nanoTime) {
            try {
                Thread.sleep(2L);
            } catch (InterruptedException e) {
            }
        }
        if (System.nanoTime() >= nanoTime) {
            throw new TimeoutException();
        }
    }

    public static void failExecutionGraph(ExecutionGraph executionGraph, Exception exc) {
        ((ExecutionVertex) executionGraph.getAllExecutionVertices().iterator().next()).fail(exc);
        Assert.assertEquals(JobStatus.FAILING, executionGraph.getState());
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
    }

    public static void switchAllVerticesToRunning(ExecutionGraph executionGraph) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().switchToRunning();
        }
    }

    public static void completeCancellingForAllVertices(ExecutionGraph executionGraph) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().cancelingComplete();
        }
    }

    public static void finishAllVertices(ExecutionGraph executionGraph) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().markFinished();
        }
    }

    public static void switchToRunning(ExecutionGraph executionGraph) {
        Iterator it = executionGraph.getAllExecutionVertices().iterator();
        while (it.hasNext()) {
            ExecutionState state = ((ExecutionVertex) it.next()).getCurrentExecutionAttempt().getState();
            if (!$assertionsDisabled && state != ExecutionState.DEPLOYING) {
                throw new AssertionError("Expected executionState to be DEPLOYING, was: " + state);
            }
        }
        Iterator it2 = executionGraph.getAllExecutionVertices().iterator();
        while (it2.hasNext()) {
            ((ExecutionVertex) it2.next()).getCurrentExecutionAttempt().switchToRunning();
        }
    }

    public static void setVertexState(ExecutionVertex executionVertex, ExecutionState executionState) {
        try {
            Execution currentExecutionAttempt = executionVertex.getCurrentExecutionAttempt();
            Field declaredField = Execution.class.getDeclaredField("state");
            declaredField.setAccessible(true);
            declaredField.set(currentExecutionAttempt, executionState);
        } catch (Exception e) {
            throw new RuntimeException("Modifying the state failed", e);
        }
    }

    public static void setVertexResource(ExecutionVertex executionVertex, SimpleSlot simpleSlot) {
        if (!executionVertex.getCurrentExecutionAttempt().tryAssignResource(simpleSlot)) {
            throw new RuntimeException("Could not assign resource.");
        }
    }

    public static SimpleSlot createMockSimpleSlot(TaskManagerGateway taskManagerGateway) {
        return new SimpleSlot(new SimpleSlotContext(new AllocationID(), new TaskManagerLocation(ResourceID.generate(), InetAddress.getLoopbackAddress(), 6572), 0, taskManagerGateway), (SlotOwner) Mockito.mock(SlotOwner.class), 0);
    }

    public static ExecutionGraph createSimpleTestGraph() throws Exception {
        return createSimpleTestGraph(new NoRestartStrategy());
    }

    public static ExecutionGraph createSimpleTestGraph(RestartStrategy restartStrategy) throws Exception {
        return createSimpleTestGraph(new JobID(), new SimpleAckingTaskManagerGateway(), restartStrategy, createNoOpVertex(10));
    }

    public static ExecutionGraph createSimpleTestGraph(JobID jobID, JobVertex... jobVertexArr) throws Exception {
        return createSimpleTestGraph(jobID, (TaskManagerGateway) new SimpleAckingTaskManagerGateway(), (RestartStrategy) new NoRestartStrategy(), jobVertexArr);
    }

    public static ExecutionGraph createSimpleTestGraph(JobID jobID, TaskManagerGateway taskManagerGateway, RestartStrategy restartStrategy, JobVertex... jobVertexArr) throws Exception {
        int i = 0;
        for (JobVertex jobVertex : jobVertexArr) {
            i += jobVertex.getParallelism();
        }
        return createSimpleTestGraph(jobID, new SimpleSlotProvider(jobID, i, taskManagerGateway), restartStrategy, jobVertexArr);
    }

    public static ExecutionGraph createSimpleTestGraph(JobID jobID, SlotProvider slotProvider, RestartStrategy restartStrategy, JobVertex... jobVertexArr) throws Exception {
        return createExecutionGraph(jobID, slotProvider, restartStrategy, TestingUtils.defaultExecutor(), jobVertexArr);
    }

    public static ExecutionGraph createExecutionGraph(JobID jobID, SlotProvider slotProvider, RestartStrategy restartStrategy, ScheduledExecutorService scheduledExecutorService, JobVertex... jobVertexArr) throws Exception {
        return createExecutionGraph(jobID, slotProvider, restartStrategy, scheduledExecutorService, Time.seconds(10L), jobVertexArr);
    }

    public static ExecutionGraph createExecutionGraph(JobID jobID, SlotProvider slotProvider, RestartStrategy restartStrategy, ScheduledExecutorService scheduledExecutorService, Time time, JobVertex... jobVertexArr) throws Exception {
        Preconditions.checkNotNull(jobID);
        Preconditions.checkNotNull(restartStrategy);
        Preconditions.checkNotNull(jobVertexArr);
        Preconditions.checkNotNull(time);
        return ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, new JobGraph(jobID, "test job", jobVertexArr), new Configuration(), scheduledExecutorService, scheduledExecutorService, slotProvider, ExecutionGraphTestUtils.class.getClassLoader(), new StandaloneCheckpointRecoveryFactory(), time, restartStrategy, new UnregisteredMetricsGroup(), 1, VoidBlobWriter.getInstance(), time, TEST_LOGGER);
    }

    public static JobVertex createNoOpVertex(int i) {
        JobVertex jobVertex = new JobVertex("vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setParallelism(i);
        return jobVertex;
    }

    public static Instance getInstance(TaskManagerGateway taskManagerGateway) throws Exception {
        return getInstance(taskManagerGateway, 1);
    }

    public static Instance getInstance(TaskManagerGateway taskManagerGateway, int i) throws Exception {
        ResourceID generate = ResourceID.generate();
        return new Instance(taskManagerGateway, new TaskManagerLocation(generate, InetAddress.getByName("127.0.0.1"), 10001), new InstanceID(), new HardwareDescription(4, 2147483648L, 1073741824L, 536870912L), i);
    }

    public static JobVertex createJobVertex(String str, int i, Class<NoOpInvokable> cls) {
        JobVertex jobVertex = new JobVertex(str);
        jobVertex.setInvokableClass(cls);
        jobVertex.setParallelism(i);
        return jobVertex;
    }

    public static ExecutionJobVertex getExecutionVertex(JobVertexID jobVertexID, ScheduledExecutorService scheduledExecutorService) throws Exception {
        JobVertex jobVertex = new JobVertex("TestVertex", jobVertexID);
        jobVertex.setInvokableClass(((AbstractInvokable) Mockito.mock(AbstractInvokable.class)).getClass());
        return (ExecutionJobVertex) Mockito.spy(new ExecutionJobVertex(new ExecutionGraph(scheduledExecutorService, scheduledExecutorService, new JobID(), "test job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new Scheduler(ExecutionContext$.MODULE$.fromExecutor(scheduledExecutorService))), jobVertex, 1, AkkaUtils.getDefaultTimeout()));
    }

    public static ExecutionJobVertex getExecutionVertex(JobVertexID jobVertexID) throws Exception {
        return getExecutionVertex(jobVertexID, TestingUtils.defaultExecutor());
    }

    public static void verifyGeneratedExecutionJobVertex(ExecutionGraph executionGraph, JobVertex jobVertex, @Nullable List<JobVertex> list, @Nullable List<JobVertex> list2) {
        ExecutionJobVertex executionJobVertex = (ExecutionJobVertex) executionGraph.getAllVertices().get(jobVertex.getID());
        Assert.assertNotNull(executionJobVertex);
        Assert.assertEquals(jobVertex.getParallelism(), executionJobVertex.getParallelism());
        Assert.assertEquals(executionGraph.getJobID(), executionJobVertex.getJobId());
        Assert.assertEquals(jobVertex.getID(), executionJobVertex.getJobVertexId());
        Assert.assertEquals(jobVertex, executionJobVertex.getJobVertex());
        if (list2 == null) {
            Assert.assertEquals(0L, executionJobVertex.getProducedDataSets().length);
        } else {
            Assert.assertEquals(list2.size(), executionJobVertex.getProducedDataSets().length);
            for (int i = 0; i < list2.size(); i++) {
                Assert.assertEquals(((IntermediateDataSet) jobVertex.getProducedDataSets().get(i)).getId(), executionJobVertex.getProducedDataSets()[i].getId());
                Assert.assertEquals(jobVertex.getParallelism(), executionJobVertex.getProducedDataSets()[0].getPartitions().length);
            }
        }
        Assert.assertEquals(jobVertex.getParallelism(), executionJobVertex.getTaskVertices().length);
        int i2 = 0;
        for (ExecutionVertex executionVertex : executionJobVertex.getTaskVertices()) {
            Assert.assertEquals(executionGraph.getJobID(), executionVertex.getJobId());
            Assert.assertEquals(jobVertex.getID(), executionVertex.getJobvertexId());
            Assert.assertEquals(jobVertex.getParallelism(), executionVertex.getTotalNumberOfParallelSubtasks());
            Assert.assertEquals(i2, executionVertex.getParallelSubtaskIndex());
            if (list == null) {
                Assert.assertEquals(0L, executionVertex.getNumberOfInputs());
            } else {
                Assert.assertEquals(list.size(), executionVertex.getNumberOfInputs());
                for (int i3 = 0; i3 < list.size(); i3++) {
                    ExecutionEdge[] inputEdges = executionVertex.getInputEdges(i3);
                    Assert.assertEquals(list.get(i3).getParallelism(), inputEdges.length);
                    int i4 = 0;
                    for (ExecutionEdge executionEdge : inputEdges) {
                        Assert.assertEquals(i3, executionEdge.getInputNum());
                        Assert.assertEquals(i4, executionEdge.getSource().getPartitionNumber());
                        i4++;
                    }
                }
            }
            i2++;
        }
    }

    static {
        $assertionsDisabled = !ExecutionGraphTestUtils.class.desiredAssertionStatus();
        TEST_LOGGER = LoggerFactory.getLogger(ExecutionGraphTestUtils.class);
        hasResourceAssigned = execution -> {
            return execution.getAssignedResource() != null;
        };
    }
}
