/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import java.io.DataOutput;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.VertexIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;
import org.roaringbitmap.RoaringBitmap;

public class TestShuffleVertexManager {
    TezVertexID vertexId = TezVertexID.fromString((String)"vertex_1436907267600_195589_1_00");
    int taskId = 0;
    List<TaskAttemptIdentifier> emptyCompletions = null;

    @Test(timeout=5000L)
    public void testShuffleVertexManagerAutoParallelism() throws Exception {
        int i;
        Configuration conf = new Configuration();
        conf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId3 = "Vertex3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "Vertex4";
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        mockInputVertices.put(mockSrcVertexId3, eProp3);
        final VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"Vertex4");
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        ShuffleVertexManager.ShuffleVertexManagerConfigBuilder configurer = ShuffleVertexManager.createConfigBuilder(null);
        VertexManagerPluginDescriptor pluginDesc = configurer.setAutoReduceParallelism(true).setDesiredTaskInputSize(1000L).setMinTaskParallelism(10).setSlowStartMaxSrcCompletionFraction(0.5f).build();
        Mockito.when((Object)mockContext.getUserPayload()).thenReturn((Object)pluginDesc.getUserPayload());
        manager = (ShuffleVertexManager)ReflectionUtils.createClazzInstance((String)pluginDesc.getClassName(), (Class[])new Class[]{VertexManagerPluginContext.class}, (Object[])new Object[]{mockContext});
        manager.initialize();
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((manager.enableAutoParallelism ? 1 : 0) != 0);
        Assert.assertTrue((manager.desiredTaskInputDataSize == 1000L ? 1 : 0) != 0);
        Assert.assertTrue((manager.minTaskParallelism == 10 ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMinSrcCompletionFraction == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMaxSrcCompletionFraction == 0.5f ? 1 : 0) != 0);
        configurer = ShuffleVertexManager.createConfigBuilder(null);
        pluginDesc = configurer.setAutoReduceParallelism(false).build();
        Mockito.when((Object)mockContext.getUserPayload()).thenReturn((Object)pluginDesc.getUserPayload());
        manager = (ShuffleVertexManager)ReflectionUtils.createClazzInstance((String)pluginDesc.getClassName(), (Class[])new Class[]{VertexManagerPluginContext.class}, (Object[])new Object[]{mockContext});
        manager.initialize();
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((!manager.enableAutoParallelism ? 1 : 0) != 0);
        Assert.assertTrue((manager.desiredTaskInputDataSize == 0x6400000L ? 1 : 0) != 0);
        Assert.assertTrue((manager.minTaskParallelism == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMinSrcCompletionFraction == 0.25f ? 1 : 0) != 0);
        Assert.assertTrue((manager.slowStartMaxSrcCompletionFraction == 0.75f ? 1 : 0) != 0);
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        final HashMap newEdgeManagers = new HashMap();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) throws Exception {
                final int numTasks = (Integer)invocation.getArguments()[0];
                Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)numTasks);
                newEdgeManagers.clear();
                for (Map.Entry entry : ((Map)invocation.getArguments()[2]).entrySet()) {
                    EdgeManagerPluginDescriptor pluginDesc = ((EdgeProperty)entry.getValue()).getEdgeManagerDescriptor();
                    final UserPayload userPayload = pluginDesc.getUserPayload();
                    EdgeManagerPluginContext emContext = new EdgeManagerPluginContext(){

                        public UserPayload getUserPayload() {
                            return userPayload == null ? null : userPayload;
                        }

                        public String getSourceVertexName() {
                            return null;
                        }

                        public String getDestinationVertexName() {
                            return null;
                        }

                        public int getSourceVertexNumTasks() {
                            return 2;
                        }

                        public int getDestinationVertexNumTasks() {
                            return numTasks;
                        }
                    };
                    EdgeManagerPlugin edgeManager = (EdgeManagerPlugin)ReflectionUtils.createClazzInstance((String)pluginDesc.getClassName(), (Class[])new Class[]{EdgeManagerPluginContext.class}, (Object[])new Object[]{emContext});
                    edgeManager.initialize();
                    newEdgeManagers.put(entry.getKey(), edgeManager);
                }
                return null;
            }
        }).when((Object)mockContext)).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn((Object)1);
        manager.onVertexStarted(this.emptyCompletions);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).vertexReconfigurationPlanned();
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        Assert.assertFalse((boolean)manager.pendingTasks.isEmpty());
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)1), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).doneReconfiguringVertex();
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        scheduledTasks.clear();
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).vertexReconfigurationPlanned();
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).doneReconfiguringVertex();
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).reconfigureVertex(Mockito.eq((int)1), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).doneReconfiguringVertex();
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(null, 5000L, mockSrcVertexId1);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)4))).vertexReconfigurationPlanned();
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.pendingTasks.size() == 4 ? 1 : 0) != 0);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).doneReconfiguringVertex();
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).doneReconfiguringVertex();
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)5000L, (long)manager.completedSourceTasksOutputSize);
        scheduledTasks.clear();
        vmEvent = this.getVertexManagerEvent(null, 1L, "Vertex");
        manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        TezTaskAttemptID taId1 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_0");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)0L, (long)manager.numVertexManagerEventsReceived);
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        scheduledTasks.clear();
        long[] sizes = new long[]{0L, 1000000L, 1010000000L, 50000000L};
        vmEvent = this.getVertexManagerEvent(sizes, 1L, "Vertex");
        manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        taId1 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_0");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId1));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)4L, (long)manager.stats.length);
        Assert.assertEquals((long)0L, (long)manager.stats[0]);
        Assert.assertEquals((long)1L, (long)manager.stats[1]);
        Assert.assertEquals((long)100L, (long)manager.stats[2]);
        Assert.assertEquals((long)10L, (long)manager.stats[3]);
        TezTaskAttemptID taId2 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_1");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", mockSrcVertexId1, taId2));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)4L, (long)manager.stats.length);
        Assert.assertEquals((long)0L, (long)manager.stats[0]);
        Assert.assertEquals((long)1L, (long)manager.stats[1]);
        Assert.assertEquals((long)100L, (long)manager.stats[2]);
        Assert.assertEquals((long)10L, (long)manager.stats[3]);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        vmEvent = this.getVertexManagerEvent(null, 1L, mockSrcVertexId1);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertTrue((!manager.determineParallelismAndApply(0.0f) ? 1 : 0) != 0);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1L, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 1L, mockSrcVertexId2);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((!manager.determineParallelismAndApply(0.25f) ? 1 : 0) != 0);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)2L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)2L, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 1200L, mockSrcVertexId2);
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertTrue((boolean)manager.determineParallelismAndApply(0.25f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)1L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)1L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)3L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1202L, (long)manager.completedSourceTasksOutputSize);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)20);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)20);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)40);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(0.2f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertEquals((long)40L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)40L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        for (i = 0; i < 8; ++i) {
            manager.onVertexManagerEventReceived(this.getVertexManagerEvent(null, 100L, mockSrcVertexId1));
            manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, i));
            ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        }
        for (i = 0; i < 3; ++i) {
            manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, i));
            ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        }
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 8));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)4))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).reconfigureVertex(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.5f), Float.valueOf(0.5f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        vmEvent = this.getVertexManagerEvent(null, 500L, mockSrcVertexId1);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)500L, (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)500L, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 500L, mockSrcVertexId2);
        manager.onVertexManagerEventReceived(vmEvent);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)2);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)5))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)3))).reconfigureVertex(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)2L, (long)scheduledTasks.size());
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(0)));
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(1)));
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)2L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1000L, (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)5))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        EdgeManagerPlugin edgeManager = (EdgeManagerPlugin)newEdgeManagers.values().iterator().next();
        HashMap targets = Maps.newHashMap();
        DataMovementEvent dmEvent = DataMovementEvent.create((int)1, (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        Assert.assertEquals((long)4L, (long)edgeManager.getNumSourceTaskPhysicalOutputs(0));
        Assert.assertEquals((long)4L, (long)edgeManager.getNumDestinationTaskPhysicalInputs(0));
        edgeManager.routeDataMovementEventToDestination(dmEvent, 1, dmEvent.getSourceIndex(), (Map)targets);
        Assert.assertEquals((long)1L, (long)targets.size());
        Map.Entry e = targets.entrySet().iterator().next();
        Assert.assertEquals((long)0L, (long)((Integer)e.getKey()).intValue());
        Assert.assertEquals((long)1L, (long)((List)e.getValue()).size());
        Assert.assertEquals((long)3L, (long)((Integer)((List)e.getValue()).get(0)).intValue());
        targets.clear();
        dmEvent = DataMovementEvent.create((int)2, (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        edgeManager.routeDataMovementEventToDestination(dmEvent, 0, dmEvent.getSourceIndex(), (Map)targets);
        Assert.assertEquals((long)1L, (long)targets.size());
        e = targets.entrySet().iterator().next();
        Assert.assertEquals((long)1L, (long)((Integer)e.getKey()).intValue());
        Assert.assertEquals((long)1L, (long)((List)e.getValue()).size());
        Assert.assertEquals((long)0L, (long)((Integer)((List)e.getValue()).get(0)).intValue());
        targets.clear();
        edgeManager.routeInputSourceTaskFailedEventToDestination(2, (Map)targets);
        Assert.assertEquals((long)2L, (long)targets.size());
        for (Map.Entry entry : targets.entrySet()) {
            Assert.assertTrue(((Integer)entry.getKey() == 0 || (Integer)entry.getKey() == 1 ? 1 : 0) != 0);
            Assert.assertEquals((long)2L, (long)((List)entry.getValue()).size());
            Assert.assertEquals((long)4L, (long)((Integer)((List)entry.getValue()).get(0)).intValue());
            Assert.assertEquals((long)5L, (long)((Integer)((List)entry.getValue()).get(1)).intValue());
        }
    }

    @Test(timeout=5000L)
    public void testShuffleVertexManagerSlowStart() {
        Configuration conf = new Configuration();
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId3 = "Vertex3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "Vertex4";
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getVertexStatistics((String)Mockito.any(String.class))).thenReturn(Mockito.mock(VertexStatistics.class));
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)mockManagedVertexId);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn((Object)1);
        mockInputVertices.put(mockSrcVertexId3, eProp3);
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
            manager.onVertexStarted(this.emptyCompletions);
            Assert.assertFalse((boolean)true);
        }
        catch (TezUncheckedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Atleast 1 bipartite source should exist"));
        }
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(-0.1f), Float.valueOf(0.0f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinSrcCompletionFraction"));
        }
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(95.0f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinSrcCompletionFraction"));
        }
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(0.5f), Float.valueOf(0.3f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinSrcCompletionFraction"));
        }
        int numTasks = 20;
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)numTasks);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)numTasks);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.8f), null);
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)(numTasks * 2), (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        float completedTasksThreshold = 0.8f * (float)numTasks;
        block8: for (String mockSrcVertex : new String[]{mockSrcVertexId1, mockSrcVertexId2}) {
            for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
                manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertex, i + 1));
                if ((float)(i + 2) >= completedTasksThreshold) continue block8;
            }
        }
        Assert.assertEquals((long)manager.totalTasksToSchedule, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)manager.totalTasksToSchedule, (long)scheduledTasks.size());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(0.0f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.totalTasksToSchedule == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.25f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)4);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)4);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 8 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 6 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 7 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 8 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(mockSrcVertexId2, 3));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 8 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void test_Tez1649_with_scatter_gather_edges() throws IOException {
        Configuration conf = new Configuration();
        conf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices_R2 = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId_R2 = "R2";
        mockInputVertices_R2.put(r1, eProp1);
        mockInputVertices_R2.put(m2, eProp2);
        mockInputVertices_R2.put(m3, eProp3);
        VertexManagerPluginContext mockContext_R2 = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
        Mockito.when((Object)mockContext_R2.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m3)).thenReturn((Object)3);
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(null, 50L, r1);
        manager = this.createManager(conf, mockContext_R2, Float.valueOf(0.001f), Float.valueOf(0.001f));
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext_R2)).scheduleTasks(Mockito.anyList());
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)6L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m3, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m3, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m3, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m2, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m2, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext_R2, (VerificationMode)Mockito.times((int)0))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)1);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 9 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext_R2, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)1), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        scheduledTasks.clear();
        Mockito.when((Object)mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
        Mockito.when((Object)mockContext_R2.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(r1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m3)).thenReturn((Object)3);
        manager = this.createManager(conf, mockContext_R2, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
    }

    VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) throws IOException {
        ByteBuffer payload = null;
        if (sizes != null) {
            RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput((long[])sizes);
            DataOutputBuffer dout = new DataOutputBuffer();
            partitionStats.serialize((DataOutput)dout);
            ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString((byte[])dout.getData());
            payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(totalSize).setPartitionStats(partitionStatsBytes).build().toByteString().asReadOnlyByteBuffer();
        } else {
            payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(totalSize).build().toByteString().asReadOnlyByteBuffer();
        }
        TaskAttemptIdentifierImpl taId = new TaskAttemptIdentifierImpl("dag", vertexName, TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)this.vertexId, (int)this.taskId++), (int)0));
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)vertexName, (ByteBuffer)payload);
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)taId);
        return vmEvent;
    }

    @Test(timeout=5000L)
    public void testSchedulingWithPartitionStats() throws IOException {
        Configuration conf = new Configuration();
        conf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "R2";
        mockInputVertices.put(r1, eProp1);
        mockInputVertices.put(m2, eProp2);
        mockInputVertices.put(m3, eProp3);
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        final LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 1 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        long[] sizes = new long[]{100000000L, 0L, 5000000000L};
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(sizes, 1060000000L, r1);
        manager.onVertexManagerEventReceived(vmEvent);
        sizes = new long[]{0L, 0L, 0L};
        vmEvent = this.getVertexManagerEvent(sizes, 1060000000L, r1);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue(((Integer)scheduledTasks.get(0) == 2 ? 1 : 0) != 0);
        Assert.assertTrue(((Integer)scheduledTasks.get(1) == 0 ? 1 : 0) != 0);
        Assert.assertTrue(((Integer)scheduledTasks.get(2) == 1 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void test_Tez1649_with_mixed_edges() {
        Configuration conf = new Configuration();
        conf.setBoolean("tez.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "R2";
        mockInputVertices.put(r1, eProp1);
        mockInputVertices.put(m2, eProp2);
        mockInputVertices.put(m3, eProp3);
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                List tasks = (List)args[0];
                for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
                    scheduledTasks.add(task.getTaskIndex());
                }
                return null;
            }
        }).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 1 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(m2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(TestShuffleVertexManager.createTaskAttemptIdentifier(r1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
    }

    public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName, int tId) {
        VertexIdentifier mockVertex = (VertexIdentifier)Mockito.mock(VertexIdentifier.class);
        Mockito.when((Object)mockVertex.getName()).thenReturn((Object)vName);
        TaskIdentifier mockTask = (TaskIdentifier)Mockito.mock(TaskIdentifier.class);
        Mockito.when((Object)mockTask.getIdentifier()).thenReturn((Object)tId);
        Mockito.when((Object)mockTask.getVertexIdentifier()).thenReturn((Object)mockVertex);
        TaskAttemptIdentifier mockAttempt = (TaskAttemptIdentifier)Mockito.mock(TaskAttemptIdentifier.class);
        Mockito.when((Object)mockAttempt.getIdentifier()).thenReturn((Object)0);
        Mockito.when((Object)mockAttempt.getTaskIdentifier()).thenReturn((Object)mockTask);
        return mockAttempt;
    }

    private ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) {
        UserPayload payload;
        if (min != null) {
            conf.setFloat("tez.shuffle-vertex-manager.min-src-fraction", min.floatValue());
        } else {
            conf.unset("tez.shuffle-vertex-manager.min-src-fraction");
        }
        if (max != null) {
            conf.setFloat("tez.shuffle-vertex-manager.max-src-fraction", max.floatValue());
        } else {
            conf.unset("tez.shuffle-vertex-manager.max-src-fraction");
        }
        try {
            payload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        Mockito.when((Object)context.getUserPayload()).thenReturn((Object)payload);
        ShuffleVertexManager manager = new ShuffleVertexManager(context);
        manager.initialize();
        return manager;
    }
}

