/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManagerBase;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManager;
import org.apache.tez.dag.library.vertexmanager.TestShuffleVertexManagerUtils;
import org.apache.tez.dag.records.TaskAttemptIdentifierImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

@RunWith(value=Parameterized.class)
public class TestShuffleVertexManagerBase
extends TestShuffleVertexManagerUtils {
    List<TaskAttemptIdentifier> emptyCompletions = null;
    Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass;

    @Parameterized.Parameters(name="test[{0}]")
    public static Collection<Object[]> data() {
        Object[][] data = new Object[][]{{ShuffleVertexManager.class}, {FairShuffleVertexManager.class}};
        return Arrays.asList(data);
    }

    public TestShuffleVertexManagerBase(Class<? extends ShuffleVertexManagerBase> shuffleVertexManagerClass) {
        this.shuffleVertexManagerClass = shuffleVertexManagerClass;
    }

    @Test(timeout=5000L)
    public void testZeroSourceTasksWithVertexStartedFirst() {
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 0, "Vertex2", 0, "Vertex3", 1, "Vertex4", 4, scheduledTasks, null);
        ShuffleVertexManagerBase manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        manager.onVertexStarted(this.emptyCompletions);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        Assert.assertFalse((boolean)manager.pendingTasks.isEmpty());
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)1), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).doneReconfiguringVertex();
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        scheduledTasks.clear();
    }

    @Test(timeout=5000L)
    public void testZeroSourceTasksWithVertexStateUpdatedFirst() {
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 0, "Vertex2", 0, "Vertex3", 1, "Vertex4", 4, scheduledTasks, null);
        ShuffleVertexManagerBase manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).vertexReconfigurationPlanned();
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).doneReconfiguringVertex();
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)1), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).doneReconfiguringVertex();
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void testVMEventFirst() throws IOException {
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null);
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(null, 1L, "Vertex");
        ShuffleVertexManagerBase manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        TezTaskAttemptID taId1 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_0");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", "Vertex1", taId1));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)0L, (long)manager.numVertexManagerEventsReceived);
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
    }

    @Test(timeout=5000L)
    public void testPartitionStats() throws IOException {
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null);
        long MB = 0x100000L;
        long[] sizes = new long[]{0L, 0x100000L, 1010827264L, 0x3000000L};
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(sizes, 0L, "Vertex", false);
        ShuffleVertexManagerBase manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        TezTaskAttemptID taId1 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_0");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", "Vertex1", taId1));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)0L, (long)manager.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals((long)1L, (long)manager.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals((long)100L, (long)manager.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals((long)10L, (long)manager.getCurrentlyKnownStatsAtIndex(3));
        TezTaskAttemptID taId2 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_1");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", "Vertex1", taId2));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)0L, (long)manager.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals((long)1L, (long)manager.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals((long)100L, (long)manager.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals((long)10L, (long)manager.getCurrentlyKnownStatsAtIndex(3));
        vmEvent = this.getVertexManagerEvent(sizes, 0L, "Vertex", true);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        taId1 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_0");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", "Vertex1", taId1));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)0L, (long)manager.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals((long)1L, (long)manager.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals((long)964L, (long)manager.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals((long)48L, (long)manager.getCurrentlyKnownStatsAtIndex(3));
        taId2 = TezTaskAttemptID.fromString((String)"attempt_1436907267600_195589_1_00_000000_1");
        vmEvent.setProducerAttemptIdentifier((TaskAttemptIdentifier)new TaskAttemptIdentifierImpl("dag", "Vertex1", taId2));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)0L, (long)manager.getCurrentlyKnownStatsAtIndex(0));
        Assert.assertEquals((long)1L, (long)manager.getCurrentlyKnownStatsAtIndex(1));
        Assert.assertEquals((long)964L, (long)manager.getCurrentlyKnownStatsAtIndex(2));
        Assert.assertEquals((long)48L, (long)manager.getCurrentlyKnownStatsAtIndex(3));
    }

    @Test(timeout=5000L)
    public void testTez978() throws IOException {
        int i;
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null);
        ShuffleVertexManagerBase manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(null, 1L, "Vertex1");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertTrue((!manager.determineParallelismAndApply(0.0f) ? 1 : 0) != 0);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1L, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 1L, "Vertex2");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertTrue((!manager.determineParallelismAndApply(0.25f) ? 1 : 0) != 0);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)2L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)2L, (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 160L * MB, "Vertex2");
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertTrue((boolean)manager.determineParallelismAndApply(0.25f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex2", 0));
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)2L, (long)scheduledTasks.size());
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)3L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)(160L * MB + 2L), (long)manager.completedSourceTasksOutputSize);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex1")).thenReturn((Object)20);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex2")).thenReturn((Object)20);
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)40);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(0.2f));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals((long)40L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)40L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        for (i = 0; i < 8; ++i) {
            manager.onVertexManagerEventReceived(this.getVertexManagerEvent(null, 10L * MB, "Vertex1"));
            manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex1", i));
            ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        }
        for (i = 0; i < 3; ++i) {
            manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex2", i));
            ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        }
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex2", 8));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)2))).reconfigureVertex(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
    }

    @Test(timeout=5000L)
    public void testAutoParallelism() throws Exception {
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        HashMap<String, EdgeManagerPlugin> newEdgeManagers = new HashMap<String, EdgeManagerPlugin>();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, newEdgeManagers);
        ShuffleVertexManagerBase manager = this.createManager(conf, mockContext, Float.valueOf(0.5f), Float.valueOf(0.5f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex1", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex2", VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate("Vertex3", VertexState.CONFIGURED));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex3", 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(null, 50L * MB, "Vertex1");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)(50L * MB), (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex1", 0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)(50L * MB), (long)manager.completedSourceTasksOutputSize);
        vmEvent = this.getVertexManagerEvent(null, 50L * MB, "Vertex2");
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex2", 1));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)2L, (long)scheduledTasks.size());
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(0)));
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(1)));
        Assert.assertEquals((long)2L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)2L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)(100L * MB), (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier("Vertex2", 0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        EdgeManagerPluginOnDemand edgeManager = (EdgeManagerPluginOnDemand)newEdgeManagers.values().iterator().next();
        Assert.assertEquals((long)4L, (long)edgeManager.getNumSourceTaskPhysicalOutputs(0));
        Assert.assertEquals((long)4L, (long)edgeManager.getNumDestinationTaskPhysicalInputs(0));
        EdgeManagerPluginOnDemand.EventRouteMetadata routeMetadata = edgeManager.routeDataMovementEventToDestination(1, 1, 0);
        Assert.assertEquals((long)1L, (long)routeMetadata.getNumEvents());
        Assert.assertEquals((long)3L, (long)routeMetadata.getTargetIndices()[0]);
        routeMetadata = edgeManager.routeDataMovementEventToDestination(0, 2, 1);
        Assert.assertEquals((long)1L, (long)routeMetadata.getNumEvents());
        Assert.assertEquals((long)0L, (long)routeMetadata.getTargetIndices()[0]);
        routeMetadata = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 0);
        Assert.assertEquals((long)2L, (long)routeMetadata.getNumEvents());
        Assert.assertEquals((long)2L, (long)routeMetadata.getTargetIndices()[0]);
        Assert.assertEquals((long)3L, (long)routeMetadata.getTargetIndices()[1]);
        routeMetadata = edgeManager.routeInputSourceTaskFailedEventToDestination(1, 1);
        Assert.assertEquals((long)2L, (long)routeMetadata.getNumEvents());
        Assert.assertEquals((long)2L, (long)routeMetadata.getTargetIndices()[0]);
        Assert.assertEquals((long)3L, (long)routeMetadata.getTargetIndices()[1]);
    }

    @Test(timeout=5000L)
    public void testShuffleVertexManagerSlowStart() {
        Configuration conf = new Configuration();
        ShuffleVertexManagerBase manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockSrcVertexId3 = "Vertex3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "Vertex4";
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getVertexStatistics((String)Mockito.any(String.class))).thenReturn(Mockito.mock(VertexStatistics.class));
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)mockManagedVertexId);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn((Object)1);
        mockInputVertices.put(mockSrcVertexId3, eProp3);
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
            manager.onVertexStarted(this.emptyCompletions);
            Assert.assertFalse((boolean)true);
        }
        catch (TezUncheckedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("At least 1 bipartite source should exist"));
        }
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.1f), Float.valueOf(0.1f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 2 ? 1 : 0) != 0);
        LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(scheduledTasks)).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(-0.1f), Float.valueOf(0.0f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(95.0f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        try {
            manager = this.createManager(conf, mockContext, Float.valueOf(0.5f), Float.valueOf(0.3f));
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinFraction"));
        }
        int numTasks = 20;
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)numTasks);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)numTasks);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.8f), null);
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)(numTasks * 2), (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        float completedTasksThreshold = 0.8f * (float)numTasks;
        block8: for (String mockSrcVertex : new String[]{mockSrcVertexId1, mockSrcVertexId2}) {
            for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
                manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertex, i + 1));
                if ((float)(i + 2) >= completedTasksThreshold) continue block8;
            }
        }
        Assert.assertEquals((long)manager.totalTasksToSchedule, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)manager.totalTasksToSchedule, (long)scheduledTasks.size());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.0f), Float.valueOf(0.0f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.totalTasksToSchedule == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.25f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(1.0f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)4);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)4);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 8 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 6 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 7 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(1.0f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 8 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 3));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 8 ? 1 : 0) != 0);
        scheduledTasks.clear();
        Mockito.when((Object)mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn((Object)1);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.25f), Float.valueOf(0.75f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 8 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 4 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 2));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId2, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 6 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(mockSrcVertexId1, 3));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numBipartiteSourceTasksCompleted == 7 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void test_Tez1649_with_scatter_gather_edges() throws IOException {
        Configuration conf = new Configuration();
        ShuffleVertexManagerBase manager = null;
        HashMap<String, EdgeProperty> mockInputVertices_R2 = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId_R2 = "R2";
        mockInputVertices_R2.put(r1, eProp1);
        mockInputVertices_R2.put(m2, eProp2);
        mockInputVertices_R2.put(m3, eProp3);
        VertexManagerPluginContext mockContext_R2 = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
        Mockito.when((Object)mockContext_R2.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m3)).thenReturn((Object)3);
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(null, 50L, r1);
        manager = this.createManager(conf, mockContext_R2, Float.valueOf(0.001f), Float.valueOf(0.001f));
        LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(scheduledTasks)).when((Object)mockContext_R2)).scheduleTasks(Mockito.anyList());
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexManagerEventReceived(vmEvent);
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)6L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(m3, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(m3, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(m3, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(m2, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(m2, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 6 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext_R2, (VerificationMode)Mockito.times((int)0))).reconfigureVertex(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)1);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 9 ? 1 : 0) != 0);
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext_R2, (VerificationMode)Mockito.times((int)1))).reconfigureVertex(Mockito.eq((int)1), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        scheduledTasks.clear();
        Mockito.when((Object)mockContext_R2.getInputVertexEdgeProperties()).thenReturn(mockInputVertices_R2);
        Mockito.when((Object)mockContext_R2.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext_R2.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(r1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext_R2.getVertexNumTasks(m3)).thenReturn((Object)3);
        manager = this.createManager(conf, mockContext_R2, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(m3, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void test_Tez1649_with_mixed_edges() {
        Configuration conf = new Configuration();
        ShuffleVertexManagerBase manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m2 = "M2";
        EdgeProperty eProp2 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String m3 = "M3";
        EdgeProperty eProp3 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "R2";
        mockInputVertices.put(r1, eProp1);
        mockInputVertices.put(m2, eProp2);
        mockInputVertices.put(m3, eProp3);
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(scheduledTasks)).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Assert.assertTrue((manager.bipartiteSources == 1 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(m2, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalNumBipartiteSourceTasks == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 1));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 2));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)3);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertEquals((long)3L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)3L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 0));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        manager.onVertexStarted(this.emptyCompletions);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(r1)).thenReturn((Object)3);
        Mockito.when((Object)mockContext.getVertexNumTasks(m2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(m3)).thenReturn((Object)0);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m3, VertexState.CONFIGURED));
        manager.onVertexStateUpdated(new VertexStateUpdate(m2, VertexState.CONFIGURED));
        manager.onSourceTaskCompleted(TestShuffleVertexManagerBase.createTaskAttemptIdentifier(r1, 0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
    }

    @Test
    public void testZeroTasksSendsConfigured() throws IOException {
        Configuration conf = new Configuration();
        ShuffleVertexManagerBase manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String r1 = "R1";
        EdgeProperty eProp1 = EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out"), (InputDescriptor)InputDescriptor.create((String)"in"));
        String mockManagedVertexId = "R2";
        mockInputVertices.put(r1, eProp1);
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"R2");
        Mockito.when((Object)mockContext.getVertexNumTasks("R2")).thenReturn((Object)0);
        manager = this.createManager(conf, mockContext, Float.valueOf(0.001f), Float.valueOf(0.001f));
        LinkedList scheduledTasks = Lists.newLinkedList();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new TestShuffleVertexManagerUtils.ScheduledTasksAnswer(scheduledTasks)).when((Object)mockContext)).scheduleTasks(Mockito.anyList());
        manager.onVertexStarted(this.emptyCompletions);
        manager.onVertexStateUpdated(new VertexStateUpdate(r1, VertexState.CONFIGURED));
        Assert.assertEquals((long)1L, (long)manager.bipartiteSources);
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        Assert.assertEquals((long)0L, (long)manager.totalNumBipartiteSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext)).doneReconfiguringVertex();
    }

    @Test(timeout=5000L)
    public void testTezDrainCompletionsOnVertexStart() throws IOException {
        Configuration conf = new Configuration();
        String mockSrcVertexId1 = "Vertex1";
        String mockSrcVertexId2 = "Vertex2";
        String mockSrcVertexId3 = "Vertex3";
        String mockManagedVertexId = "Vertex4";
        LinkedList scheduledTasks = Lists.newLinkedList();
        VertexManagerPluginContext mockContext = this.createVertexManagerContext("Vertex1", 2, "Vertex2", 2, "Vertex3", 2, "Vertex4", 4, scheduledTasks, null);
        ShuffleVertexManagerBase manager = this.createManager(conf, mockContext, Float.valueOf(0.01f), Float.valueOf(0.75f));
        Assert.assertEquals((long)0L, (long)manager.numBipartiteSourceTasksCompleted);
        manager.onVertexStarted(Collections.singletonList(TestShuffleVertexManager.createTaskAttemptIdentifier("Vertex1", 0)));
        Assert.assertEquals((long)1L, (long)manager.numBipartiteSourceTasksCompleted);
    }

    private ShuffleVertexManagerBase createManager(Configuration conf, VertexManagerPluginContext context, Float min, Float max) {
        return TestShuffleVertexManagerBase.createManager(this.shuffleVertexManagerClass, conf, context, true, null, min, max);
    }
}

