/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.library.vertexmanager;

import com.google.common.collect.Maps;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.RuntimeUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeManager;
import org.apache.tez.dag.api.EdgeManagerContext;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.verification.VerificationMode;

public class TestShuffleVertexManager {
    @Test(timeout=5000L)
    public void testShuffleVertexManagerAutoParallelism() throws IOException {
        Configuration conf = new Configuration();
        conf.setBoolean("tez.am.shuffle-vertex-manager.enable.auto-parallel", true);
        conf.setLong("tez.am.shuffle-vertex-manager.desired-task-input-size", 1000L);
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor("out"), new InputDescriptor("in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor("out"), new InputDescriptor("in"));
        String mockSrcVertexId3 = "Vertex3";
        EdgeProperty eProp3 = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor("out"), new InputDescriptor("in"));
        String mockManagedVertexId = "Vertex4";
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        mockInputVertices.put(mockSrcVertexId3, eProp3);
        final VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)"Vertex4");
        Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)4);
        manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
        Assert.assertTrue((manager.bipartiteSources.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)manager.bipartiteSources.containsKey(mockSrcVertexId1));
        Assert.assertTrue((boolean)manager.bipartiteSources.containsKey(mockSrcVertexId2));
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                scheduledTasks.addAll((List)args[0]);
                return null;
            }
        }).when((Object)mockContext)).scheduleVertexTasks(Mockito.anyList());
        final HashMap newEdgeManagers = new HashMap();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Mockito.when((Object)mockContext.getVertexNumTasks("Vertex4")).thenReturn((Object)2);
                newEdgeManagers.clear();
                for (Map.Entry entry : ((Map)invocation.getArguments()[2]).entrySet()) {
                    EdgeManager edgeManager = (EdgeManager)RuntimeUtils.createClazzInstance((String)((EdgeManagerDescriptor)entry.getValue()).getClassName());
                    final byte[] userPayload = ((EdgeManagerDescriptor)entry.getValue()).getUserPayload();
                    edgeManager.initialize(new EdgeManagerContext(){

                        public byte[] getUserPayload() {
                            return userPayload;
                        }

                        public String getSrcVertexName() {
                            return null;
                        }

                        public String getDestVertexName() {
                            return null;
                        }
                    });
                    newEdgeManagers.put(entry.getKey(), edgeManager);
                }
                return null;
            }
        }).when((Object)mockContext)).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId3)).thenReturn((Object)1);
        manager.onVertexStarted(null);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 4 ? 1 : 0) != 0);
        scheduledTasks.clear();
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        byte[] payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(5000L).build().toByteArray();
        VertexManagerEvent vmEvent = new VertexManagerEvent("Vertex", payload);
        manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.pendingTasks.size() == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext, (VerificationMode)Mockito.times((int)0))).setVertexParallelism(Mockito.anyInt(), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numSourceTasksCompleted);
        Assert.assertEquals((long)5000L, (long)manager.completedSourceTasksOutputSize);
        scheduledTasks.clear();
        payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(500L).build().toByteArray();
        vmEvent = new VertexManagerEvent("Vertex", payload);
        manager = this.createManager(conf, mockContext, 0.5f, 0.5f);
        manager.onVertexStarted(null);
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.numSourceTasks);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)4L, (long)manager.numSourceTasks);
        Assert.assertEquals((long)0L, (long)manager.numSourceTasksCompleted);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numSourceTasksCompleted);
        Assert.assertEquals((long)1L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)500L, (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertEquals((long)4L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)0L, (long)scheduledTasks.size());
        Assert.assertEquals((long)1L, (long)manager.numSourceTasksCompleted);
        Assert.assertEquals((long)500L, (long)manager.completedSourceTasksOutputSize);
        manager.onVertexManagerEventReceived(vmEvent);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext)).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        Assert.assertEquals((long)0L, (long)manager.pendingTasks.size());
        Assert.assertEquals((long)2L, (long)scheduledTasks.size());
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(0)));
        Assert.assertTrue((boolean)scheduledTasks.contains(new Integer(1)));
        Assert.assertEquals((long)2L, (long)manager.numSourceTasksCompleted);
        Assert.assertEquals((long)2L, (long)manager.numVertexManagerEventsReceived);
        Assert.assertEquals((long)1000L, (long)manager.completedSourceTasksOutputSize);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        ((VertexManagerPluginContext)Mockito.verify((Object)mockContext)).setVertexParallelism(Mockito.eq((int)2), (VertexLocationHint)Mockito.any(VertexLocationHint.class), Mockito.anyMap());
        Assert.assertEquals((long)2L, (long)newEdgeManagers.size());
        EdgeManager edgeManager = (EdgeManager)newEdgeManagers.values().iterator().next();
        HashMap targets = Maps.newHashMap();
        DataMovementEvent dmEvent = new DataMovementEvent(1, new byte[0]);
        edgeManager.routeDataMovementEventToDestination(dmEvent, 1, 2, (Map)targets);
        Assert.assertEquals((long)1L, (long)targets.size());
        Map.Entry e = targets.entrySet().iterator().next();
        Assert.assertEquals((long)3L, (long)((Integer)e.getKey()).intValue());
        Assert.assertEquals((long)1L, (long)((List)e.getValue()).size());
        Assert.assertEquals((long)0L, (long)((Integer)((List)e.getValue()).get(0)).intValue());
        targets.clear();
        dmEvent = new DataMovementEvent(2, new byte[0]);
        edgeManager.routeDataMovementEventToDestination(dmEvent, 0, 2, (Map)targets);
        Assert.assertEquals((long)1L, (long)targets.size());
        e = targets.entrySet().iterator().next();
        Assert.assertEquals((long)0L, (long)((Integer)e.getKey()).intValue());
        Assert.assertEquals((long)1L, (long)((List)e.getValue()).size());
        Assert.assertEquals((long)1L, (long)((Integer)((List)e.getValue()).get(0)).intValue());
        targets.clear();
        edgeManager.routeInputSourceTaskFailedEventToDestination(2, 2, (Map)targets);
        Assert.assertEquals((long)2L, (long)targets.size());
        for (Map.Entry entry : targets.entrySet()) {
            Assert.assertTrue(((Integer)entry.getKey() == 4 || (Integer)entry.getKey() == 5 ? 1 : 0) != 0);
            Assert.assertEquals((long)2L, (long)((List)entry.getValue()).size());
            Assert.assertEquals((long)0L, (long)((Integer)((List)entry.getValue()).get(0)).intValue());
            Assert.assertEquals((long)1L, (long)((Integer)((List)entry.getValue()).get(1)).intValue());
        }
    }

    @Test
    public void testShuffleVertexManagerSlowStart() {
        Configuration conf = new Configuration();
        ShuffleVertexManager manager = null;
        HashMap<String, EdgeProperty> mockInputVertices = new HashMap<String, EdgeProperty>();
        String mockSrcVertexId1 = "Vertex1";
        EdgeProperty eProp1 = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor("out"), new InputDescriptor("in"));
        String mockSrcVertexId2 = "Vertex2";
        EdgeProperty eProp2 = new EdgeProperty(EdgeProperty.DataMovementType.SCATTER_GATHER, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor("out"), new InputDescriptor("in"));
        String mockSrcVertexId3 = "Vertex3";
        EdgeProperty eProp3 = new EdgeProperty(EdgeProperty.DataMovementType.BROADCAST, EdgeProperty.DataSourceType.PERSISTED, EdgeProperty.SchedulingType.SEQUENTIAL, new OutputDescriptor("out"), new InputDescriptor("in"));
        String mockManagedVertexId = "Vertex4";
        VertexManagerPluginContext mockContext = (VertexManagerPluginContext)Mockito.mock(VertexManagerPluginContext.class);
        Mockito.when((Object)mockContext.getInputVertexEdgeProperties()).thenReturn(mockInputVertices);
        Mockito.when((Object)mockContext.getVertexName()).thenReturn((Object)mockManagedVertexId);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn((Object)3);
        mockInputVertices.put(mockSrcVertexId3, eProp3);
        try {
            manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
            Assert.assertFalse((boolean)true);
        }
        catch (TezUncheckedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Atleast 1 bipartite source should exist"));
        }
        mockInputVertices.put(mockSrcVertexId1, eProp1);
        mockInputVertices.put(mockSrcVertexId2, eProp2);
        manager = this.createManager(conf, mockContext, 0.1f, 0.1f);
        Assert.assertTrue((manager.bipartiteSources.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)manager.bipartiteSources.containsKey(mockSrcVertexId1));
        Assert.assertTrue((boolean)manager.bipartiteSources.containsKey(mockSrcVertexId2));
        final HashSet scheduledTasks = new HashSet();
        ((VertexManagerPluginContext)Mockito.doAnswer((Answer)new Answer(){

            public Object answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                scheduledTasks.clear();
                scheduledTasks.addAll((List)args[0]);
                return null;
            }
        }).when((Object)mockContext)).scheduleVertexTasks(Mockito.anyList());
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)0);
        manager.onVertexStarted(null);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn((Object)2);
        Mockito.when((Object)mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn((Object)2);
        try {
            manager = this.createManager(conf, mockContext, -0.1f, 0.0f);
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinSrcCompletionFraction"));
        }
        try {
            manager = this.createManager(conf, mockContext, 0.5f, 0.3f);
            Assert.assertTrue((boolean)false);
        }
        catch (IllegalArgumentException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Invalid values for slowStartMinSrcCompletionFraction"));
        }
        manager = this.createManager(conf, mockContext, 0.0f, 0.0f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.totalTasksToSchedule == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 0.25f, 0.25f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 1.0f, 1.0f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 1.0f, 1.0f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId3, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 0 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 1 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((boolean)manager.pendingTasks.isEmpty());
        Assert.assertTrue((scheduledTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 0.25f, 0.75f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 3 ? 1 : 0) != 0);
        scheduledTasks.clear();
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 4 ? 1 : 0) != 0);
        manager = this.createManager(conf, mockContext, 0.25f, 1.0f);
        manager.onVertexStarted(null);
        Assert.assertTrue((manager.pendingTasks.size() == 3 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasks == 4 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(0));
        manager.onSourceTaskCompleted(mockSrcVertexId1, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 2 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 2 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(0));
        Assert.assertTrue((manager.pendingTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 3 ? 1 : 0) != 0);
        manager.onSourceTaskCompleted(mockSrcVertexId2, new Integer(1));
        Assert.assertTrue((manager.pendingTasks.size() == 0 ? 1 : 0) != 0);
        Assert.assertTrue((scheduledTasks.size() == 1 ? 1 : 0) != 0);
        Assert.assertTrue((manager.numSourceTasksCompleted == 4 ? 1 : 0) != 0);
    }

    private ShuffleVertexManager createManager(Configuration conf, VertexManagerPluginContext context, float min, float max) {
        byte[] payload;
        conf.setFloat("tez.am.shuffle-vertex-manager.min-src-fraction", min);
        conf.setFloat("tez.am.shuffle-vertex-manager.max-src-fraction", max);
        ShuffleVertexManager manager = new ShuffleVertexManager();
        try {
            payload = TezUtils.createUserPayloadFromConf((Configuration)conf);
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        Mockito.when((Object)context.getUserPayload()).thenReturn((Object)payload);
        manager.initialize(context);
        return manager;
    }
}

