package org.apache.flink.runtime.executiongraph;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.SimpleSlot;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.DirectScheduledExecutorService;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.class */
public class ExecutionGraphDeploymentTest {

    /* loaded from: input_file:org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest$FailingFinalizeJobVertex.class */
    public static class FailingFinalizeJobVertex extends JobVertex {
        public FailingFinalizeJobVertex(String str, JobVertexID jobVertexID) {
            super(str, jobVertexID);
        }

        public void finalizeOnMaster(ClassLoader classLoader) throws Exception {
            throw new Exception();
        }
    }

    @Test
    public void testBuildDeploymentDescriptor() {
        try {
            JobID jobID = new JobID();
            JobVertexID jobVertexID = new JobVertexID();
            JobVertexID jobVertexID2 = new JobVertexID();
            JobVertexID jobVertexID3 = new JobVertexID();
            JobVertexID jobVertexID4 = new JobVertexID();
            JobVertex jobVertex = new JobVertex("v1", jobVertexID);
            JobVertex jobVertex2 = new JobVertex("v2", jobVertexID2);
            JobVertex jobVertex3 = new JobVertex("v3", jobVertexID3);
            JobVertex jobVertex4 = new JobVertex("v4", jobVertexID4);
            jobVertex.setParallelism(10);
            jobVertex2.setParallelism(10);
            jobVertex3.setParallelism(10);
            jobVertex4.setParallelism(10);
            jobVertex.setInvokableClass(BatchTask.class);
            jobVertex2.setInvokableClass(BatchTask.class);
            jobVertex3.setInvokableClass(BatchTask.class);
            jobVertex4.setInvokableClass(BatchTask.class);
            jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex3.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            jobVertex4.connectNewDataSetAsInput(jobVertex2, DistributionPattern.ALL_TO_ALL, ResultPartitionType.PIPELINED);
            ExecutionGraph executionGraph = new ExecutionGraph(TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), jobID, "some job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), new Scheduler(TestingUtils.defaultExecutionContext()));
            executionGraph.attachJobGraph(Arrays.asList(jobVertex, jobVertex2, jobVertex3, jobVertex4));
            ExecutionVertex executionVertex = ((ExecutionJobVertex) executionGraph.getAllVertices().get(jobVertexID2)).getTaskVertices()[3];
            ExecutionGraphTestUtils.SimpleActorGateway simpleActorGateway = new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext());
            SimpleSlot allocateSimpleSlot = ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(simpleActorGateway)).allocateSimpleSlot(jobID);
            Assert.assertEquals(ExecutionState.CREATED, executionVertex.getExecutionState());
            executionVertex.deployToSlot(allocateSimpleSlot);
            Assert.assertEquals(ExecutionState.DEPLOYING, executionVertex.getExecutionState());
            TaskDeploymentDescriptor taskDeploymentDescriptor = simpleActorGateway.lastTDD;
            Assert.assertNotNull(taskDeploymentDescriptor);
            JobInformation jobInformation = (JobInformation) taskDeploymentDescriptor.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
            TaskInformation taskInformation = (TaskInformation) taskDeploymentDescriptor.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
            Assert.assertEquals(jobID, jobInformation.getJobId());
            Assert.assertEquals(jobVertexID2, taskInformation.getJobVertexId());
            Assert.assertEquals(3L, taskDeploymentDescriptor.getSubtaskIndex());
            Assert.assertEquals(10L, taskInformation.getNumberOfSubtasks());
            Assert.assertEquals(BatchTask.class.getName(), taskInformation.getInvokableClassName());
            Assert.assertEquals("v2", taskInformation.getTaskName());
            Collection producedPartitions = taskDeploymentDescriptor.getProducedPartitions();
            Collection inputGates = taskDeploymentDescriptor.getInputGates();
            Assert.assertEquals(2L, producedPartitions.size());
            Assert.assertEquals(1L, inputGates.size());
            Iterator it = producedPartitions.iterator();
            Iterator it2 = inputGates.iterator();
            Assert.assertEquals(10L, ((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions());
            Assert.assertEquals(10L, ((ResultPartitionDeploymentDescriptor) it.next()).getNumberOfSubpartitions());
            Assert.assertEquals(10L, ((InputGateDeploymentDescriptor) it2.next()).getInputChannelDeploymentDescriptors().length);
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFinishing() {
        try {
            Iterator it = ((Map) setupExecution(new JobVertex("v1", new JobVertexID()), 7650, new JobVertex("v2", new JobVertexID()), 2350).f1).values().iterator();
            while (it.hasNext()) {
                ((Execution) it.next()).markFinished();
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailing() {
        try {
            Iterator it = ((Map) setupExecution(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6).f1).values().iterator();
            while (it.hasNext()) {
                ((Execution) it.next()).markFailed((Throwable) null);
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testRegistrationOfExecutionsFailedExternally() {
        try {
            Iterator it = ((Map) setupExecution(new JobVertex("v1", new JobVertexID()), 7, new JobVertex("v2", new JobVertexID()), 6).f1).values().iterator();
            while (it.hasNext()) {
                ((Execution) it.next()).fail((Throwable) null);
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testAccumulatorsAndMetricsForwarding() throws Exception {
        Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> tuple2 = setupExecution(new JobVertex("v1", new JobVertexID()), 1, new JobVertex("v2", new JobVertexID()), 1);
        ExecutionGraph executionGraph = (ExecutionGraph) tuple2.f0;
        Execution execution = (Execution) ((Map) tuple2.f1).values().iterator().next();
        IOMetrics iOMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0d, 0.0d, 0.0d, 0.0d, 0.0d);
        HashMap hashMap = new HashMap();
        hashMap.put("acc", new IntCounter(4));
        executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), execution.getAttemptId(), ExecutionState.CANCELED, (Throwable) null, new AccumulatorSnapshot(executionGraph.getJobID(), execution.getAttemptId(), hashMap), iOMetrics));
        Assert.assertEquals(iOMetrics, execution.getIOMetrics());
        Assert.assertNotNull(execution.getUserAccumulators());
        Assert.assertEquals(4, ((Accumulator) execution.getUserAccumulators().get("acc")).getLocalValue());
        Execution execution2 = (Execution) ((Map) tuple2.f1).values().iterator().next();
        IOMetrics iOMetrics2 = new IOMetrics(0, 0, 0, 0, 0, 0.0d, 0.0d, 0.0d, 0.0d, 0.0d);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("acc", new IntCounter(8));
        executionGraph.updateState(new TaskExecutionState(executionGraph.getJobID(), execution2.getAttemptId(), ExecutionState.FAILED, (Throwable) null, new AccumulatorSnapshot(executionGraph.getJobID(), execution2.getAttemptId(), hashMap2), iOMetrics2));
        Assert.assertEquals(iOMetrics2, execution2.getIOMetrics());
        Assert.assertNotNull(execution2.getUserAccumulators());
        Assert.assertEquals(8, ((Accumulator) execution2.getUserAccumulators().get("acc")).getLocalValue());
    }

    @Test
    public void testAccumulatorsAndMetricsStorage() throws Exception {
        Map map = (Map) setupExecution(new JobVertex("v1", new JobVertexID()), 1, new JobVertex("v2", new JobVertexID()), 1).f1;
        IOMetrics iOMetrics = new IOMetrics(0, 0, 0, 0, 0, 0.0d, 0.0d, 0.0d, 0.0d, 0.0d);
        Map emptyMap = Collections.emptyMap();
        Execution execution = (Execution) map.values().iterator().next();
        execution.cancel();
        execution.cancelingComplete(emptyMap, iOMetrics);
        Assert.assertEquals(iOMetrics, execution.getIOMetrics());
        Assert.assertEquals(emptyMap, execution.getUserAccumulators());
        Execution execution2 = (Execution) map.values().iterator().next();
        execution2.markFailed(new Throwable(), emptyMap, iOMetrics);
        Assert.assertEquals(iOMetrics, execution2.getIOMetrics());
        Assert.assertEquals(emptyMap, execution2.getUserAccumulators());
    }

    @Test
    public void testRegistrationOfExecutionsCanceled() {
        try {
            for (Execution execution : ((Map) setupExecution(new JobVertex("v1", new JobVertexID()), 19, new JobVertex("v2", new JobVertexID()), 37).f1).values()) {
                execution.cancel();
                execution.cancelingComplete();
            }
            Assert.assertEquals(0L, r0.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testNoResourceAvailableFailure() throws Exception {
        JobID jobID = new JobID();
        JobVertex jobVertex = new JobVertex("source");
        JobVertex jobVertex2 = new JobVertex("sink");
        jobVertex.setParallelism(1);
        jobVertex2.setParallelism(1);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
        Scheduler scheduler = new Scheduler(TestingUtils.directExecutionContext());
        for (int i = 0; i < 1; i++) {
            scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()))));
        }
        ExecutionGraph executionGraph = new ExecutionGraph(new DirectScheduledExecutorService(), TestingUtils.defaultExecutor(), jobID, "failing test job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), scheduler);
        executionGraph.setQueuedSchedulingAllowed(false);
        executionGraph.attachJobGraph(Arrays.asList(jobVertex, jobVertex2));
        Assert.assertEquals(1, scheduler.getNumberOfAvailableSlots());
        executionGraph.scheduleForExecution();
        ExecutionAttemptID attemptId = executionGraph.getJobVertex(jobVertex.getID()).getTaskVertices()[0].getCurrentExecutionAttempt().getAttemptId();
        executionGraph.updateState(new TaskExecutionState(jobID, attemptId, ExecutionState.RUNNING));
        executionGraph.updateState(new TaskExecutionState(jobID, attemptId, ExecutionState.FINISHED, (Throwable) null));
        Assert.assertEquals(JobStatus.FAILED, executionGraph.getState());
    }

    @Test
    public void testSettingDefaultMaxNumberOfCheckpointsToRetain() throws Exception {
        Assert.assertEquals(((Integer) CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), createExecutionGraph(new Configuration()).getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    @Test
    public void testSettingMaxNumberOfCheckpointsToRetain() throws Exception {
        new Configuration().setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, 10);
        Assert.assertEquals(10L, createExecutionGraph(r0).getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    private Tuple2<ExecutionGraph, Map<ExecutionAttemptID, Execution>> setupExecution(JobVertex jobVertex, int i, JobVertex jobVertex2, int i2) throws Exception {
        JobID jobID = new JobID();
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i2);
        jobVertex.setInvokableClass(BatchTask.class);
        jobVertex2.setInvokableClass(BatchTask.class);
        Scheduler scheduler = new Scheduler(TestingUtils.defaultExecutionContext());
        for (int i3 = 0; i3 < i + i2; i3++) {
            scheduler.newInstanceAvailable(ExecutionGraphTestUtils.getInstance(new ActorTaskManagerGateway(new ExecutionGraphTestUtils.SimpleActorGateway(TestingUtils.directExecutionContext()))));
        }
        ExecutionGraph executionGraph = new ExecutionGraph(new DirectScheduledExecutorService(), TestingUtils.defaultExecutor(), jobID, "some job", new Configuration(), new SerializedValue(new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), new NoRestartStrategy(), scheduler);
        executionGraph.setQueuedSchedulingAllowed(false);
        executionGraph.attachJobGraph(Arrays.asList(jobVertex, jobVertex2));
        Assert.assertEquals(i + i2, scheduler.getNumberOfAvailableSlots());
        executionGraph.scheduleForExecution();
        Map registeredExecutions = executionGraph.getRegisteredExecutions();
        Assert.assertEquals(i + i2, registeredExecutions.size());
        return new Tuple2<>(executionGraph, registeredExecutions);
    }

    @Test
    public void testSettingIllegalMaxNumberOfCheckpointsToRetain() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(CoreOptions.MAX_RETAINED_CHECKPOINTS, -10);
        ExecutionGraph createExecutionGraph = createExecutionGraph(configuration);
        Assert.assertNotEquals(-10L, createExecutionGraph.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
        Assert.assertEquals(((Integer) CoreOptions.MAX_RETAINED_CHECKPOINTS.defaultValue()).intValue(), createExecutionGraph.getCheckpointCoordinator().getCheckpointStore().getMaxNumberOfRetainedCheckpoints());
    }

    private ExecutionGraph createExecutionGraph(Configuration configuration) throws Exception {
        ScheduledExecutorService defaultExecutor = TestingUtils.defaultExecutor();
        JobGraph jobGraph = new JobGraph(new JobID(), "test");
        jobGraph.setSnapshotSettings(new JobCheckpointingSettings(Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 100L, 600000L, 0L, 1, ExternalizedCheckpointSettings.none(), (StateBackend) null, false));
        return ExecutionGraphBuilder.buildGraph((ExecutionGraph) null, jobGraph, configuration, defaultExecutor, defaultExecutor, new ProgrammedSlotProvider(1), getClass().getClassLoader(), new StandaloneCheckpointRecoveryFactory(), Time.seconds(10L), new NoRestartStrategy(), new UnregisteredMetricsGroup(), 1, LoggerFactory.getLogger(getClass()));
    }
}
