/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.core.io.InputSplitSource;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ExecutionEdge;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.SerializedValue;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.concurrent.ExecutionContext;

public class ExecutionGraphConstructionTest {
    @Test
    public void testCreateSimpleGraphBipartite() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        Configuration cfg = new Configuration();
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Test Job Sample Name", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        this.verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
    }

    @Test
    public void testAttachViaDataSets() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        Configuration cfg = new Configuration();
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
        IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Test Job Sample Name", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v4.setParallelism(11);
        v5.setParallelism(4);
        v4.connectDataSetAsInput(v2result, DistributionPattern.ALL_TO_ALL);
        v4.connectDataSetAsInput(v3result_1, DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
        v5.connectDataSetAsInput(v3result_2, DistributionPattern.ALL_TO_ALL);
        ArrayList<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
        try {
            eg.attachJobGraph(ordered2);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        this.verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
    }

    @Test
    public void testAttachViaIds() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        Configuration cfg = new Configuration();
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
        IntermediateDataSet v2result = v2.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_1 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        IntermediateDataSet v3result_2 = v3.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Test Job Sample Name", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v4.setParallelism(11);
        v5.setParallelism(4);
        v4.connectIdInput(v2result.getId(), DistributionPattern.ALL_TO_ALL);
        v4.connectIdInput(v3result_1.getId(), DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
        v5.connectIdInput(v3result_2.getId(), DistributionPattern.ALL_TO_ALL);
        ArrayList<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v4, v5));
        try {
            eg.attachJobGraph(ordered2);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        this.verifyTestGraph(eg, jobId, v1, v2, v3, v4, v5);
    }

    private void verifyTestGraph(ExecutionGraph eg, JobID jobId, JobVertex v1, JobVertex v2, JobVertex v3, JobVertex v4, JobVertex v5) {
        int sumOfPartitions;
        ExecutionEdge[] inputs;
        Map vertices = eg.getAllVertices();
        ExecutionJobVertex e1 = (ExecutionJobVertex)vertices.get(v1.getID());
        Assert.assertNotNull((Object)e1);
        Assert.assertEquals((long)v1.getParallelism(), (long)e1.getParallelism());
        Assert.assertEquals((Object)v1.getID(), (Object)e1.getJobVertexId());
        Assert.assertEquals((Object)jobId, (Object)e1.getJobId());
        Assert.assertEquals((Object)v1, (Object)e1.getJobVertex());
        Assert.assertEquals((long)1L, (long)e1.getProducedDataSets().length);
        Assert.assertEquals((Object)((IntermediateDataSet)v1.getProducedDataSets().get(0)).getId(), (Object)e1.getProducedDataSets()[0].getId());
        Assert.assertEquals((long)v1.getParallelism(), (long)e1.getProducedDataSets()[0].getPartitions().length);
        Assert.assertEquals((long)v1.getParallelism(), (long)e1.getTaskVertices().length);
        int num = 0;
        for (ExecutionVertex ev : e1.getTaskVertices()) {
            Assert.assertEquals((Object)jobId, (Object)ev.getJobId());
            Assert.assertEquals((Object)v1.getID(), (Object)ev.getJobvertexId());
            Assert.assertEquals((long)v1.getParallelism(), (long)ev.getTotalNumberOfParallelSubtasks());
            Assert.assertEquals((long)num++, (long)ev.getParallelSubtaskIndex());
            Assert.assertEquals((long)0L, (long)ev.getNumberOfInputs());
            Assert.assertTrue((ev.getStateTimestamp(ExecutionState.CREATED) > 0L ? 1 : 0) != 0);
        }
        ExecutionJobVertex e2 = (ExecutionJobVertex)vertices.get(v2.getID());
        Assert.assertNotNull((Object)e2);
        Assert.assertEquals((long)1L, (long)e2.getProducedDataSets().length);
        Assert.assertEquals((Object)((IntermediateDataSet)v2.getProducedDataSets().get(0)).getId(), (Object)e2.getProducedDataSets()[0].getId());
        Assert.assertEquals((long)v2.getParallelism(), (long)e2.getProducedDataSets()[0].getPartitions().length);
        Assert.assertEquals((long)v2.getParallelism(), (long)e2.getTaskVertices().length);
        num = 0;
        for (ExecutionVertex ev : e2.getTaskVertices()) {
            Assert.assertEquals((Object)jobId, (Object)ev.getJobId());
            Assert.assertEquals((Object)v2.getID(), (Object)ev.getJobvertexId());
            Assert.assertEquals((long)v2.getParallelism(), (long)ev.getTotalNumberOfParallelSubtasks());
            Assert.assertEquals((long)num++, (long)ev.getParallelSubtaskIndex());
            Assert.assertEquals((long)1L, (long)ev.getNumberOfInputs());
            inputs = ev.getInputEdges(0);
            Assert.assertEquals((long)v1.getParallelism(), (long)inputs.length);
            sumOfPartitions = 0;
            for (ExecutionEdge inEdge : inputs) {
                Assert.assertEquals((long)0L, (long)inEdge.getInputNum());
                sumOfPartitions += inEdge.getSource().getPartitionNumber();
            }
            Assert.assertEquals((long)10L, (long)sumOfPartitions);
        }
        ExecutionJobVertex e3 = (ExecutionJobVertex)vertices.get(v3.getID());
        Assert.assertNotNull((Object)e3);
        Assert.assertEquals((long)2L, (long)e3.getProducedDataSets().length);
        Assert.assertEquals((Object)((IntermediateDataSet)v3.getProducedDataSets().get(0)).getId(), (Object)e3.getProducedDataSets()[0].getId());
        Assert.assertEquals((Object)((IntermediateDataSet)v3.getProducedDataSets().get(1)).getId(), (Object)e3.getProducedDataSets()[1].getId());
        Assert.assertEquals((long)v3.getParallelism(), (long)e3.getProducedDataSets()[0].getPartitions().length);
        Assert.assertEquals((long)v3.getParallelism(), (long)e3.getProducedDataSets()[1].getPartitions().length);
        Assert.assertEquals((long)v3.getParallelism(), (long)e3.getTaskVertices().length);
        num = 0;
        for (ExecutionVertex ev : e3.getTaskVertices()) {
            Assert.assertEquals((Object)jobId, (Object)ev.getJobId());
            Assert.assertEquals((Object)v3.getID(), (Object)ev.getJobvertexId());
            Assert.assertEquals((long)v3.getParallelism(), (long)ev.getTotalNumberOfParallelSubtasks());
            Assert.assertEquals((long)num++, (long)ev.getParallelSubtaskIndex());
            Assert.assertEquals((long)0L, (long)ev.getNumberOfInputs());
        }
        ExecutionJobVertex e4 = (ExecutionJobVertex)vertices.get(v4.getID());
        Assert.assertNotNull((Object)e4);
        Assert.assertEquals((long)1L, (long)e4.getProducedDataSets().length);
        Assert.assertEquals((Object)((IntermediateDataSet)v4.getProducedDataSets().get(0)).getId(), (Object)e4.getProducedDataSets()[0].getId());
        Assert.assertEquals((long)v4.getParallelism(), (long)e4.getTaskVertices().length);
        num = 0;
        for (ExecutionVertex ev : e4.getTaskVertices()) {
            Assert.assertEquals((Object)jobId, (Object)ev.getJobId());
            Assert.assertEquals((Object)v4.getID(), (Object)ev.getJobvertexId());
            Assert.assertEquals((long)v4.getParallelism(), (long)ev.getTotalNumberOfParallelSubtasks());
            Assert.assertEquals((long)num++, (long)ev.getParallelSubtaskIndex());
            Assert.assertEquals((long)2L, (long)ev.getNumberOfInputs());
            inputs = ev.getInputEdges(0);
            Assert.assertEquals((long)v2.getParallelism(), (long)inputs.length);
            sumOfPartitions = 0;
            for (ExecutionEdge inEdge : inputs) {
                Assert.assertEquals((long)0L, (long)inEdge.getInputNum());
                sumOfPartitions += inEdge.getSource().getPartitionNumber();
            }
            Assert.assertEquals((long)21L, (long)sumOfPartitions);
            inputs = ev.getInputEdges(1);
            Assert.assertEquals((long)v3.getParallelism(), (long)inputs.length);
            sumOfPartitions = 0;
            for (ExecutionEdge inEdge : inputs) {
                Assert.assertEquals((long)1L, (long)inEdge.getInputNum());
                sumOfPartitions += inEdge.getSource().getPartitionNumber();
            }
            Assert.assertEquals((long)1L, (long)sumOfPartitions);
        }
        ExecutionJobVertex e5 = (ExecutionJobVertex)vertices.get(v5.getID());
        Assert.assertNotNull((Object)e5);
        Assert.assertEquals((long)0L, (long)e5.getProducedDataSets().length);
        Assert.assertEquals((long)v5.getParallelism(), (long)e5.getTaskVertices().length);
        num = 0;
        for (ExecutionVertex ev : e5.getTaskVertices()) {
            Assert.assertEquals((Object)jobId, (Object)ev.getJobId());
            Assert.assertEquals((Object)v5.getID(), (Object)ev.getJobvertexId());
            Assert.assertEquals((long)v5.getParallelism(), (long)ev.getTotalNumberOfParallelSubtasks());
            Assert.assertEquals((long)num++, (long)ev.getParallelSubtaskIndex());
            Assert.assertEquals((long)2L, (long)ev.getNumberOfInputs());
            inputs = ev.getInputEdges(0);
            Assert.assertEquals((long)v4.getParallelism(), (long)inputs.length);
            sumOfPartitions = 0;
            for (ExecutionEdge inEdge : inputs) {
                Assert.assertEquals((long)0L, (long)inEdge.getInputNum());
                sumOfPartitions += inEdge.getSource().getPartitionNumber();
            }
            Assert.assertEquals((long)55L, (long)sumOfPartitions);
            inputs = ev.getInputEdges(1);
            Assert.assertEquals((long)v3.getParallelism(), (long)inputs.length);
            sumOfPartitions = 0;
            for (ExecutionEdge inEdge : inputs) {
                Assert.assertEquals((long)1L, (long)inEdge.getInputNum());
                sumOfPartitions += inEdge.getSource().getPartitionNumber();
            }
            Assert.assertEquals((long)1L, (long)sumOfPartitions);
        }
    }

    @Test
    public void testCannotConnectMissingId() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        Configuration cfg = new Configuration();
        JobVertex v1 = new JobVertex("vertex1");
        v1.setParallelism(7);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1));
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Test Job Sample Name", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
        try {
            eg.attachJobGraph(ordered);
        }
        catch (JobException e) {
            e.printStackTrace();
            Assert.fail((String)("Job failed with exception: " + e.getMessage()));
        }
        JobVertex v2 = new JobVertex("vertex2");
        v2.connectIdInput(new IntermediateDataSetID(), DistributionPattern.ALL_TO_ALL);
        ArrayList<JobVertex> ordered2 = new ArrayList<JobVertex>(Arrays.asList(v2));
        try {
            eg.attachJobGraph(ordered2);
            Assert.fail((String)"Attached wrong jobgraph");
        }
        catch (JobException jobException) {
            // empty catch block
        }
    }

    @Test
    public void testCannotConnectWrongOrder() throws Exception {
        JobID jobId = new JobID();
        String jobName = "Test Job Sample Name";
        Configuration cfg = new Configuration();
        JobVertex v1 = new JobVertex("vertex1");
        JobVertex v2 = new JobVertex("vertex2");
        JobVertex v3 = new JobVertex("vertex3");
        JobVertex v4 = new JobVertex("vertex4");
        JobVertex v5 = new JobVertex("vertex5");
        v1.setParallelism(5);
        v2.setParallelism(7);
        v3.setParallelism(2);
        v4.setParallelism(11);
        v5.setParallelism(4);
        v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
        v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
        v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
        v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
        ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v5, v4));
        ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Test Job Sample Name", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
        try {
            eg.attachJobGraph(ordered);
            Assert.fail((String)"Attached wrong jobgraph");
        }
        catch (JobException jobException) {
            // empty catch block
        }
    }

    @Test
    public void testSetupInputSplits() {
        try {
            InputSplit[] emptySplits = new InputSplit[]{};
            InputSplitAssigner assigner1 = (InputSplitAssigner)Mockito.mock(InputSplitAssigner.class);
            InputSplitAssigner assigner2 = (InputSplitAssigner)Mockito.mock(InputSplitAssigner.class);
            InputSplitSource source1 = (InputSplitSource)Mockito.mock(InputSplitSource.class);
            InputSplitSource source2 = (InputSplitSource)Mockito.mock(InputSplitSource.class);
            Mockito.when((Object)source1.createInputSplits(Matchers.anyInt())).thenReturn((Object)emptySplits);
            Mockito.when((Object)source2.createInputSplits(Matchers.anyInt())).thenReturn((Object)emptySplits);
            Mockito.when((Object)source1.getInputSplitAssigner(emptySplits)).thenReturn((Object)assigner1);
            Mockito.when((Object)source2.getInputSplitAssigner(emptySplits)).thenReturn((Object)assigner2);
            JobID jobId = new JobID();
            String jobName = "Test Job Sample Name";
            Configuration cfg = new Configuration();
            JobVertex v1 = new JobVertex("vertex1");
            JobVertex v2 = new JobVertex("vertex2");
            JobVertex v3 = new JobVertex("vertex3");
            JobVertex v4 = new JobVertex("vertex4");
            JobVertex v5 = new JobVertex("vertex5");
            v1.setParallelism(5);
            v2.setParallelism(7);
            v3.setParallelism(2);
            v4.setParallelism(11);
            v5.setParallelism(4);
            v2.connectNewDataSetAsInput(v1, DistributionPattern.ALL_TO_ALL);
            v4.connectNewDataSetAsInput(v2, DistributionPattern.ALL_TO_ALL);
            v4.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
            v5.connectNewDataSetAsInput(v4, DistributionPattern.ALL_TO_ALL);
            v5.connectNewDataSetAsInput(v3, DistributionPattern.ALL_TO_ALL);
            v3.setInputSplitSource(source1);
            v5.setInputSplitSource(source2);
            ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3, v4, v5));
            ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Test Job Sample Name", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
            try {
                eg.attachJobGraph(ordered);
            }
            catch (JobException e) {
                e.printStackTrace();
                Assert.fail((String)("Job failed with exception: " + e.getMessage()));
            }
            Assert.assertEquals((Object)assigner1, (Object)((ExecutionJobVertex)eg.getAllVertices().get(v3.getID())).getSplitAssigner());
            Assert.assertEquals((Object)assigner2, (Object)((ExecutionJobVertex)eg.getAllVertices().get(v5.getID())).getSplitAssigner());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testMoreThanOneConsumerForIntermediateResult() {
        try {
            JobID jobId = new JobID();
            String jobName = "Test Job Sample Name";
            Configuration cfg = new Configuration();
            JobVertex v1 = new JobVertex("vertex1");
            JobVertex v2 = new JobVertex("vertex2");
            JobVertex v3 = new JobVertex("vertex3");
            v1.setParallelism(5);
            v2.setParallelism(7);
            v3.setParallelism(2);
            IntermediateDataSet result = v1.createAndAddResultDataSet(ResultPartitionType.PIPELINED);
            v2.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
            v3.connectDataSetAsInput(result, DistributionPattern.ALL_TO_ALL);
            ArrayList<JobVertex> ordered = new ArrayList<JobVertex>(Arrays.asList(v1, v2, v3));
            ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Test Job Sample Name", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
            try {
                eg.attachJobGraph(ordered);
                Assert.fail((String)"Should not be possible");
            }
            catch (RuntimeException runtimeException) {}
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }

    @Test
    public void testCoLocationConstraintCreation() {
        try {
            int i;
            JobID jobId = new JobID();
            String jobName = "Co-Location Constraint Sample Job";
            Configuration cfg = new Configuration();
            JobVertex v1 = new JobVertex("vertex1");
            JobVertex v2 = new JobVertex("vertex2");
            v1.setParallelism(6);
            v2.setParallelism(4);
            SlotSharingGroup sl1 = new SlotSharingGroup();
            v1.setSlotSharingGroup(sl1);
            v2.setSlotSharingGroup(sl1);
            v2.setStrictlyCoLocatedWith(v1);
            v1.setStrictlyCoLocatedWith(v2);
            JobVertex v3 = new JobVertex("vertex3");
            JobVertex v4 = new JobVertex("vertex4");
            JobVertex v5 = new JobVertex("vertex5");
            JobVertex v6 = new JobVertex("vertex6");
            JobVertex v7 = new JobVertex("vertex7");
            v3.setParallelism(3);
            v4.setParallelism(3);
            v5.setParallelism(3);
            v6.setParallelism(3);
            v7.setParallelism(3);
            SlotSharingGroup sl2 = new SlotSharingGroup();
            v3.setSlotSharingGroup(sl2);
            v4.setSlotSharingGroup(sl2);
            v5.setSlotSharingGroup(sl2);
            v6.setSlotSharingGroup(sl2);
            v7.setSlotSharingGroup(sl2);
            v4.setStrictlyCoLocatedWith(v3);
            v5.setStrictlyCoLocatedWith(v4);
            v6.setStrictlyCoLocatedWith(v3);
            v3.setStrictlyCoLocatedWith(v7);
            JobVertex v8 = new JobVertex("vertex8");
            v8.setParallelism(2);
            JobGraph jg = new JobGraph(jobId, "Co-Location Constraint Sample Job", new JobVertex[]{v1, v2, v3, v4, v5, v6, v7, v8});
            ExecutionGraph eg = new ExecutionGraph((ExecutionContext)TestingUtils.defaultExecutionContext(), jobId, "Co-Location Constraint Sample Job", cfg, new SerializedValue((Object)new ExecutionConfig()), AkkaUtils.getDefaultTimeout(), (RestartStrategy)new NoRestartStrategy());
            eg.attachJobGraph(jg.getVerticesSortedTopologicallyFromSources());
            ExecutionVertex[] v1s = eg.getJobVertex(v1.getID()).getTaskVertices();
            ExecutionVertex[] v2s = eg.getJobVertex(v2.getID()).getTaskVertices();
            HashSet<CoLocationConstraint> all = new HashSet<CoLocationConstraint>();
            for (i = 0; i < v2.getParallelism(); ++i) {
                Assert.assertNotNull((Object)v1s[i].getLocationConstraint());
                Assert.assertNotNull((Object)v2s[i].getLocationConstraint());
                Assert.assertTrue((v1s[i].getLocationConstraint() == v2s[i].getLocationConstraint() ? 1 : 0) != 0);
                all.add(v1s[i].getLocationConstraint());
            }
            for (i = v2.getParallelism(); i < v1.getParallelism(); ++i) {
                Assert.assertNotNull((Object)v1s[i].getLocationConstraint());
                all.add(v1s[i].getLocationConstraint());
            }
            Assert.assertEquals((String)"not all co location constraints are distinct", (long)v1.getParallelism(), (long)all.size());
            ExecutionVertex[] v3s = eg.getJobVertex(v3.getID()).getTaskVertices();
            ExecutionVertex[] v4s = eg.getJobVertex(v4.getID()).getTaskVertices();
            ExecutionVertex[] v5s = eg.getJobVertex(v5.getID()).getTaskVertices();
            ExecutionVertex[] v6s = eg.getJobVertex(v6.getID()).getTaskVertices();
            ExecutionVertex[] v7s = eg.getJobVertex(v7.getID()).getTaskVertices();
            HashSet<CoLocationConstraint> all2 = new HashSet<CoLocationConstraint>();
            for (int i2 = 0; i2 < v3.getParallelism(); ++i2) {
                Assert.assertNotNull((Object)v3s[i2].getLocationConstraint());
                Assert.assertTrue((v3s[i2].getLocationConstraint() == v4s[i2].getLocationConstraint() ? 1 : 0) != 0);
                Assert.assertTrue((v4s[i2].getLocationConstraint() == v5s[i2].getLocationConstraint() ? 1 : 0) != 0);
                Assert.assertTrue((v5s[i2].getLocationConstraint() == v6s[i2].getLocationConstraint() ? 1 : 0) != 0);
                Assert.assertTrue((v6s[i2].getLocationConstraint() == v7s[i2].getLocationConstraint() ? 1 : 0) != 0);
                all2.add(v3s[i2].getLocationConstraint());
            }
            Assert.assertEquals((String)"not all co location constraints are distinct", (long)v3.getParallelism(), (long)all2.size());
            ExecutionVertex[] v8s = eg.getJobVertex(v8.getID()).getTaskVertices();
            for (int i3 = 0; i3 < v8.getParallelism(); ++i3) {
                Assert.assertNull((Object)v8s[i3].getLocationConstraint());
            }
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
    }
}

