package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.class */
public class ExecutionGraphDeploymentTest {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest$FailingFinalizeJobVertex.class */
    public static class FailingFinalizeJobVertex extends JobVertex {
        public FailingFinalizeJobVertex(String str) {
            super(str);
        }

        public FailingFinalizeJobVertex(String str, JobVertexID jobVertexID) {
            super(str, jobVertexID);
        }

        public void finalizeOnMaster(ClassLoader classLoader) throws Exception {
            throw new Exception();
        }
    }

    @Test
    public void testBuildDeploymentDescriptor() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            JobVertex jobVertex = new JobVertex("v1", jobVertexID);
            JobVertex jobVertex2 = new JobVertex("v2", jobVertexID2);
            JobVertex jobVertex3 = new JobVertex("v3", jobVertexID3);
            JobVertex jobVertex4 = new JobVertex("v4", jobVertexID4);
            jobVertex.setParallelism(10);
            jobVertex2.setParallelism(10);
            jobVertex3.setParallelism(10);
            jobVertex4.setParallelism(10);
            jobVertex.setInvokableClass(BatchTask.class);
            jobVertex2.setInvokableClass(BatchTask.class);
            jobVertex3.setInvokableClass(BatchTask.class);
            jobVertex4.setInvokableClass(BatchTask.class);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL);
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL);
            jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL);
            ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutionContext(), TestingUtils.defaultExecutionContext(), jobID, "some job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy());
            executionGraph.attachJobGraph(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4));
            ExecutionVertex executionVertex = ((ExecutionJobVertex) executionGraph.getAllVertices().get(jobVertexID2)).getTaskVertices()[3];
            ExecutionGraphTestUtils.SimpleActorGateway simpleActorGateway = new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(simpleActorGateway).allocateSimpleSlot(jobID);
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            TaskDeploymentDescriptor taskDeploymentDescriptor = simpleActorGateway.lastTDD;
            Assert.assertNotNull(taskDeploymentDescriptor);
            JobInformation jobInformation = (JobInformation) taskDeploymentDescriptor.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
            TaskInformation taskInformation = (TaskInformation) taskDeploymentDescriptor.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
            Assert.assertEquals(jobID, jobInformation.getJobId());
            Assert.assertEquals(jobVertexID2, taskInformation.getJobVertexId());
            Assert.assertEquals(3L, taskDeploymentDescriptor.getSubtaskIndex());
            Assert.assertEquals(10L, taskInformation.getParallelism());
            Assert.assertEquals(BatchTask.class.getName(), taskInformation.getInvokableClassName());
            Assert.assertEquals("v2", taskInformation.getTaskName());
            Collection producedPartitions = taskDeploymentDescriptor.getProducedPartitions();
            Collection inputGates = taskDeploymentDescriptor.getInputGates();
            Assert.assertEquals(2L, producedPartitions.size());
            Assert.assertEquals(1L, inputGates.size());
            Iterator it = producedPartitions.iterator();
            Iterator it2 = inputGates.iterator();
            Assert.assertEquals(10L, ((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions());
            Assert.assertEquals(10L, ((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions());
            Assert.assertEquals(10L, ((InputGateDeploymentDescriptor) it2.next()).getInputChannelDeploymentDescriptors().length);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFinishing() {
        try {
            Iterator<Execution> it = setupExecution(new JobVertex("v1", new JobVertexID()), 7650, new JobVertex("v2", new JobVertexID()), 2350).values().iterator();
            while (it.hasNext()) {
                it.next().markFinished();
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailing() {
        try {
            Iterator<Execution> it = setupExecution(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6).values().iterator();
            while (it.hasNext()) {
                it.next().markFailed((Throwable) null);
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailedExternally() {
        try {
            Iterator<Execution> it = setupExecution(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6).values().iterator();
            while (it.hasNext()) {
                it.next().fail((Throwable) null);
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsCanceled() {
        try {
            for (Execution execution : setupExecution(new JobVertex("v1", new JobVertexID()), 19, new JobVertex("v2", new JobVertexID()), 37).values()) {
                execution.cancel();
                execution.cancelingComplete();
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailingFinalize() {
        try {
            Map<ExecutionAttemptID, Execution> map = setupExecution(new FailingFinalizeJobVertex("v1", new JobVertexID()), 6, new JobVertex("v2", new JobVertexID()), 4);
            ArrayList<Execution> arrayList = new ArrayList();
            arrayList.addAll(map.values());
            Collections.sort(arrayList, new Comparator<Execution>() { // from class: org.apache.flink.runtime.executiongraph.ExecutionGraphDeploymentTest.1
                @Override // java.util.Comparator
                public int compare(Execution execution, Execution execution2) {
                    return execution.getVertex().getSimpleName().compareTo(execution2.getVertex().getSimpleName());
                }
            });
            int i = 0;
            for (Execution execution : arrayList) {
                i++;
                execution.markFinished();
                if (i <= 6) {
                    Assert.assertEquals(ExecutionState.FINISHED, execution.getState());
                } else {
                    Assert.assertEquals(ExecutionState.CANCELED, execution.getState());
                }
            }
            Assert.assertEquals(0L, map.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testNoResourceAvailableFailure() throws Exception {
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(1);
        jobVertex2.setParallelism(1);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.directExecutionContext(), TestingUtils.defaultExecutionContext(), jobID, "failing test job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy());
        executionGraph.setQueuedSchedulingAllowed(false);
        executionGraph.attachJobGraph(Arrays.asList(jobVertex, jobVertex2));
        Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
        for (int i = 0; i < 1; i++) {
            scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())));
        }
        Assert.assertEquals(1, scheduler.getNumberOfAvailableSlots());
        executionGraph.scheduleForExecution(scheduler);
        ExecutionAttemptID attemptId = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        executionGraph.updateState(new TaskExecutionState(jobID, attemptId, ExecutionState.RUNNING));
        executionGraph.updateState(new TaskExecutionState(jobID, attemptId, ExecutionState.FINISHED, (Throwable) null, new AccumulatorSnapshot(jobID, attemptId, new HashMap(), new HashMap())));
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    private Map<ExecutionAttemptID, Execution> setupExecution(JobVertex jobVertex, int i, JobVertex jobVertex2, int i2) throws Exception {
        JobID jobID = new JobID();
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i2);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.directExecutionContext(), TestingUtils.defaultExecutionContext(), jobID, "some job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy());
        executionGraph.setQueuedSchedulingAllowed(false);
        executionGraph.attachJobGraph(Arrays.asList(jobVertex, jobVertex2));
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        for (int i3 = 0; i3 < i + i2; i3++) {
            scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext())));
        }
        Assert.assertEquals(i + i2, scheduler.getNumberOfAvailableSlots());
        executionGraph.scheduleForExecution(scheduler);
        Map<ExecutionAttemptID, Execution> registeredExecutions = executionGraph.getRegisteredExecutions();
        Assert.assertEquals(i + i2, registeredExecutions.size());
        return registeredExecutions;
    }
}
