/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.protobuf.ByteString;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.RootInputLeafOutput;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.VertexManagerPlugin;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.MockClock;
import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TestStateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexStateUpdateListener;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputFailed;
import org.apache.tez.dag.app.dag.event.VertexEventRootInputInitialized;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskAttemptCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventTermination;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException;
import org.apache.tez.dag.app.dag.impl.CallableEventDispatcher;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.Edge;
import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.app.dag.impl.VertexManager;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.dag.utils.TaskSpecificLaunchCmdOption;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.InputSpecUpdate;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.GroupInputSpec;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
import org.apache.tez.test.EdgeManagerForTest;
import org.apache.tez.test.VertexManagerPluginForTest;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.internal.util.collections.Sets;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.roaringbitmap.RoaringBitmap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestVertexImpl {
    private static final Logger LOG = LoggerFactory.getLogger(TestVertexImpl.class);
    private ListeningExecutorService execService;
    private boolean useCustomInitializer = false;
    private InputInitializer customInitializer = null;
    private TezDAGID dagId;
    private ApplicationAttemptId appAttemptId;
    private DAGProtos.DAGPlan dagPlan;
    private DAGProtos.DAGPlan invalidDagPlan;
    private Map<String, VertexImpl> vertices;
    private Map<TezVertexID, VertexImpl> vertexIdMap;
    private DrainDispatcher dispatcher;
    private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private Clock clock = new SystemClock();
    private TaskHeartbeatHandler thh;
    private AppContext appContext;
    private VertexLocationHint vertexLocationHint = null;
    private Configuration conf;
    private Map<String, Edge> edges;
    private Map<String, DAGImpl.VertexGroupInfo> vertexGroups;
    private byte[] edgePayload = "EP".getBytes();
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private HistoryEventHandler historyEventHandler;
    private TestStateChangeNotifier.StateChangeNotifierForTest updateTracker;
    private static TaskSpecificLaunchCmdOption taskSpecificLaunchCmdOption;

    private DAGProtos.DAGPlan createInvalidDAGPlan() {
        LOG.info("Setting up invalid dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximplinvalid").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(0).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithCountingVM() {
        LOG.info("Setting up dag plan with coutning VertexManager");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("dagWithCountingVM").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(InvocationCountingVertexManager.class.getName())).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithVMException(String initializerClassName, VertexManagerWithException.VMExceptionLocation exLocation) {
        LOG.info("Setting up dag plan with VertexManager which would throw exception");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerWithException.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])exLocation.name().getBytes())))).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerWithException.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])exLocation.name().getBytes())))).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithIIException() {
        LOG.info("Setting up dag plan with VertexManager which would throw exception");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithNonExistInputInitializer() {
        LOG.info("Setting up dag plan with non exist inputinitializer");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-input-initializer")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithNonExistOutputCommitter() {
        LOG.info("Setting up dag plan with non exist output committer");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-output-committer")).setName("output1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("OutputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithNonExistVertexManager() {
        LOG.info("Setting up dag plan with non-exist VertexManager");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("non-exist-vertexmanager")).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithMixedEdges() {
        LOG.info("Setting up mixed edge dag plan");
        org.apache.tez.dag.api.DAG dag = org.apache.tez.dag.api.DAG.create((String)"MixedEdges");
        org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create((String)"vertex1", (ProcessorDescriptor)ProcessorDescriptor.create((String)"v1.class"), (int)1, (Resource)Resource.newInstance((int)0, (int)0));
        org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create((String)"vertex2", (ProcessorDescriptor)ProcessorDescriptor.create((String)"v2.class"), (int)1, (Resource)Resource.newInstance((int)0, (int)0));
        org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create((String)"vertex3", (ProcessorDescriptor)ProcessorDescriptor.create((String)"v3.class"), (int)1, (Resource)Resource.newInstance((int)0, (int)0));
        org.apache.tez.dag.api.Vertex v4 = org.apache.tez.dag.api.Vertex.create((String)"vertex4", (ProcessorDescriptor)ProcessorDescriptor.create((String)"v4.class"), (int)1, (Resource)Resource.newInstance((int)0, (int)0));
        org.apache.tez.dag.api.Vertex v5 = org.apache.tez.dag.api.Vertex.create((String)"vertex5", (ProcessorDescriptor)ProcessorDescriptor.create((String)"v5.class"), (int)1, (Resource)Resource.newInstance((int)0, (int)0));
        org.apache.tez.dag.api.Vertex v6 = org.apache.tez.dag.api.Vertex.create((String)"vertex6", (ProcessorDescriptor)ProcessorDescriptor.create((String)"v6.class"), (int)1, (Resource)Resource.newInstance((int)0, (int)0));
        dag.addVertex(v1).addVertex(v2).addVertex(v3).addVertex(v4).addVertex(v5).addVertex(v6);
        dag.addEdge(org.apache.tez.dag.api.Edge.create((org.apache.tez.dag.api.Vertex)v1, (org.apache.tez.dag.api.Vertex)v2, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out.class"), (InputDescriptor)InputDescriptor.create((String)"out.class"))));
        dag.addEdge(org.apache.tez.dag.api.Edge.create((org.apache.tez.dag.api.Vertex)v1, (org.apache.tez.dag.api.Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.BROADCAST, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out.class"), (InputDescriptor)InputDescriptor.create((String)"out.class"))));
        dag.addEdge(org.apache.tez.dag.api.Edge.create((org.apache.tez.dag.api.Vertex)v4, (org.apache.tez.dag.api.Vertex)v2, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out.class"), (InputDescriptor)InputDescriptor.create((String)"out.class"))));
        dag.addEdge(org.apache.tez.dag.api.Edge.create((org.apache.tez.dag.api.Vertex)v5, (org.apache.tez.dag.api.Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.ONE_TO_ONE, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out.class"), (InputDescriptor)InputDescriptor.create((String)"out.class"))));
        dag.addEdge(org.apache.tez.dag.api.Edge.create((org.apache.tez.dag.api.Vertex)v4, (org.apache.tez.dag.api.Vertex)v6, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out.class"), (InputDescriptor)InputDescriptor.create((String)"out.class"))));
        dag.addEdge(org.apache.tez.dag.api.Edge.create((org.apache.tez.dag.api.Vertex)v5, (org.apache.tez.dag.api.Vertex)v6, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.ONE_TO_ONE, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"out.class"), (InputDescriptor)InputDescriptor.create((String)"out.class"))));
        return dag.createDag(this.conf, null, null, null, true);
    }

    private DAGProtos.DAGPlan createDAGPlanWithInitializer0Tasks(String initializerClassName) {
        LOG.info("Setting up dag plan with input initializer and 0 tasks");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("initializerWith0Tasks").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addInEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex1").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithMultipleInitializers(String initializerClassName) {
        LOG.info("Setting up dag plan with multiple input initializer");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testVertexWithMultipleInitializers").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithInputInitializer(String initializerClassName) {
        LOG.info("Setting up dag plan with input initializer");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testVertexWithInitializer").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input3").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(RootInputSpecUpdaterVertexManager.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])new byte[]{0})))).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input4").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(RootInputSpecUpdaterVertexManager.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])new byte[]{1})))).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithRunningInitializer3() {
        LOG.info("Setting up dag plan with running input initializer3");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("DagWithInputInitializer3").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(20).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v2_v3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithRunningInitializer4() {
        LOG.info("Setting up dag plan with running input initializer4");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("DagWithInputInitializer4").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(1).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addInEdgeId("e1").addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(20).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v2_v3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithRunningInitializer() {
        LOG.info("Setting up dag plan with running input initializer");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("DagWithInputInitializer2").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("IrrelevantInitializerClassName")).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(20).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanWithInputDistributor(String initializerClassName) {
        LOG.info("Setting up invalid dag plan with input distributor");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testVertexWithInitializer").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v5")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGPlanForOneToOneSplit(String initializerClassName, int numTasks, boolean addNullEdge) {
        DAGProtos.VertexPlan.Builder v1Builder = DAGProtos.VertexPlan.newBuilder();
        v1Builder.setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addOutEdgeId("e1").addOutEdgeId("e2");
        if (addNullEdge) {
            v1Builder.addOutEdgeId("e5");
        }
        if (initializerClassName != null) {
            numTasks = -1;
            v1Builder.addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(initializerClassName)).setName("input1").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("InputClazz").build()).build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build());
        } else {
            v1Builder.setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(numTasks).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build());
        }
        DAGProtos.VertexPlan v1Plan = v1Builder.build();
        DAGProtos.VertexPlan.Builder v4Builder = DAGProtos.VertexPlan.newBuilder();
        v4Builder.setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(numTasks).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e3").addInEdgeId("e4");
        if (addNullEdge) {
            v4Builder.addOutEdgeId("e6");
        }
        DAGProtos.VertexPlan v4Plan = v4Builder.build();
        LOG.info("Setting up one to one dag plan");
        DAGProtos.DAGPlan.Builder dagBuilder = DAGProtos.DAGPlan.newBuilder().setName("testVertexOneToOneSplit").addVertex(v1Plan).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(numTasks).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addInEdgeId("e1").addOutEdgeId("e3").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(numTasks).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x3.y3").build()).addInEdgeId("e2").addOutEdgeId("e4").build()).addVertex(v4Plan).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v3")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v2_v4")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v3_v4")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.ONE_TO_ONE).setId("e4").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL));
        if (addNullEdge) {
            dagBuilder.addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex5").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex6").setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v5")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v4_v6")).setInputVertexName("vertex4").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build());
        }
        DAGProtos.DAGPlan dag = dagBuilder.build();
        return dag;
    }

    private DAGProtos.DAGPlan createTestDAGPlan() {
        LOG.info("Setting up dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").addOutEdgeId("e3").addOutEdgeId("e4").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack4").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e3").addOutEdgeId("e5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex5").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host5").addRack("rack5").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x5.y5").build()).addInEdgeId("e4").addOutEdgeId("e6").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex6").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host6").addRack("rack6").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output").build()).setName("outputx").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()))).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x6.y6").build()).addInEdgeId("e5").addInEdgeId("e6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i4_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v4")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i5_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v5")).setOutputVertexName("vertex5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(EdgeManagerForTest.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])this.edgePayload))).build()).setId("e4").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v4")).setInputVertexName("vertex4").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o4")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v5")).setInputVertexName("vertex5").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o5")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createSamplerDAGPlan(boolean customEdge) {
        LOG.info("Setting up dag plan");
        DAGProtos.VertexPlan.Builder vCBuilder = DAGProtos.VertexPlan.newBuilder();
        vCBuilder.setName("C").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("C.class")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(customEdge ? -1 : 2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).addInEdgeId("A_C").addInEdgeId("B_C");
        if (customEdge) {
            vCBuilder.setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName()));
        }
        DAGProtos.VertexPlan vCPlan = vCBuilder.build();
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("TestSamplerDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("A").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).addOutEdgeId("A_B").addOutEdgeId("A_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("B").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addInEdgeId("A_B").addOutEdgeId("B_C").build()).addVertex(vCPlan).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_B.class")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_B.class")).setOutputVertexName("B").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("A_B").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C.class")).setOutputVertexName("C").setDataMovementType(customEdge ? DAGProtos.PlanEdgeDataMovementType.CUSTOM : DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("A_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setInputVertexName("B").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setOutputVertexName("C").setDataMovementType(customEdge ? DAGProtos.PlanEdgeDataMovementType.CUSTOM : DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("B_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createVertexGroupDAGPlan() {
        LOG.info("Setting up group dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("TestGroupDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("A").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).addOutEdgeId("A_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("B").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addOutEdgeId("B_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("C").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("C.class")).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("A_C").addInEdgeId("B_C").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("A_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setInputVertexName("B").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("B_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addVertexGroups(DAGProtos.PlanVertexGroupInfo.newBuilder().setGroupName("Group").addGroupMembers("A").addGroupMembers("B").addEdgeMergedInputs(DAGProtos.PlanGroupInputEdgeInfo.newBuilder().setDestVertexName("C").setMergedInput(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("Group.class").build()).build())).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGWithCustomVertexManager() {
        LOG.info("Setting up custom vertex manager dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("TestCustomVMDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("v1").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("v2").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).addOutEdgeId("2_3").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("v3").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).setVertexManagerPlugin(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(VertexManagerPluginForTest.class.getName())).addInEdgeId("2_3").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("2_3")).setInputVertexName("v2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("2_3.class")).setOutputVertexName("v3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("2_3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createSamplerDAGPlan2() {
        LOG.info("Setting up sampler 2 dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("TestSamplerDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("A").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("A.class").build()).addOutEdgeId("A_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("B").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("").build()).addOutEdgeId("B_C").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("C").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("C.class")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("A_C").addInEdgeId("B_C").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C")).setInputVertexName("A").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("A_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("A_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setInputVertexName("B").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("B_C.class")).setOutputVertexName("C").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("B_C").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private void setupVertices() {
        int vCnt = this.dagPlan.getVertexCount();
        LOG.info("Setting up vertices from dag plan, verticesCnt=" + vCnt);
        this.vertices = new HashMap<String, VertexImpl>();
        this.vertexIdMap = new HashMap<TezVertexID, VertexImpl>();
        Configuration dagConf = new Configuration(false);
        dagConf.set("abc", "foobar");
        for (int i = 0; i < vCnt; ++i) {
            DAGProtos.VertexPlan vPlan = this.dagPlan.getVertex(i);
            String vName = vPlan.getName();
            TezVertexID vertexId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)(i + 1));
            VertexImpl v = null;
            VertexLocationHint locationHint = DagTypeConverters.convertFromDAGPlan((List)vPlan.getTaskLocationHintList());
            v = this.useCustomInitializer ? (this.customInitializer == null ? new VertexImplWithControlledInitializerManager(vertexId, vPlan, vPlan.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, this.appContext, locationHint, this.dispatcher, this.updateTracker, dagConf) : new VertexImplWithRunningInputInitializer(vertexId, vPlan, vPlan.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, this.appContext, locationHint, this.dispatcher, this.customInitializer, this.updateTracker, dagConf)) : new VertexImpl(vertexId, vPlan, vPlan.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, true, this.appContext, locationHint, this.vertexGroups, taskSpecificLaunchCmdOption, (StateChangeNotifier)this.updateTracker, dagConf);
            this.vertices.put(vName, v);
            this.vertexIdMap.put(vertexId, v);
        }
    }

    private void parseVertexEdges() {
        LOG.info("Parsing edges from dag plan, edgeCount=" + this.dagPlan.getEdgeCount());
        int vCnt = this.dagPlan.getVertexCount();
        Map edgePlans = DagTypeConverters.createEdgePlanMapFromDAGPlan((List)this.dagPlan.getEdgeList());
        for (int i = 0; i < vCnt; ++i) {
            Edge edge;
            DAGProtos.EdgePlan edgePlan;
            DAGProtos.VertexPlan vertexPlan = this.dagPlan.getVertex(i);
            Vertex vertex = (Vertex)this.vertices.get(vertexPlan.getName());
            HashMap<Vertex, Edge> inVertices = new HashMap<Vertex, Edge>();
            HashMap<Vertex, Edge> outVertices = new HashMap<Vertex, Edge>();
            for (String inEdgeId : vertexPlan.getInEdgeIdList()) {
                edgePlan = (DAGProtos.EdgePlan)edgePlans.get(inEdgeId);
                Vertex inVertex = (Vertex)this.vertices.get(edgePlan.getInputVertexName());
                edge = this.edges.get(inEdgeId);
                edge.setSourceVertex(inVertex);
                edge.setDestinationVertex(vertex);
                inVertices.put(inVertex, edge);
            }
            for (String outEdgeId : vertexPlan.getOutEdgeIdList()) {
                edgePlan = (DAGProtos.EdgePlan)edgePlans.get(outEdgeId);
                Vertex outVertex = (Vertex)this.vertices.get(edgePlan.getOutputVertexName());
                edge = this.edges.get(outEdgeId);
                edge.setSourceVertex(vertex);
                edge.setDestinationVertex(outVertex);
                outVertices.put(outVertex, edge);
            }
            LOG.info("Setting input vertices for vertex " + vertex.getName() + ", inputVerticesCnt=" + inVertices.size());
            vertex.setInputVertices(inVertices);
            LOG.info("Setting output vertices for vertex " + vertex.getName() + ", outputVerticesCnt=" + outVertices.size());
            vertex.setOutputVertices(outVertices);
        }
    }

    public void setupPreDagCreation() {
        LOG.info("____________ RESETTING CURRENT DAG ____________");
        this.conf = new Configuration();
        this.conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        this.dagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)1);
        taskSpecificLaunchCmdOption = (TaskSpecificLaunchCmdOption)Mockito.mock(TaskSpecificLaunchCmdOption.class);
        ((TaskSpecificLaunchCmdOption)Mockito.doReturn((Object)false).when((Object)taskSpecificLaunchCmdOption)).addTaskSpecificLaunchCmdOption((String)Matchers.any(String.class), Matchers.anyInt());
    }

    public void setupPostDagCreation() throws TezException {
        UserGroupInformation ugi;
        String dagName = "dag0";
        if (this.dispatcher != null) {
            this.dispatcher.stop();
        }
        this.dispatcher = new DrainDispatcher();
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        Mockito.when((Object)this.appContext.getHadoopShim()).thenReturn((Object)new DefaultHadoopShim());
        Mockito.when((Object)this.appContext.getContainerLauncherName(Matchers.anyInt())).thenReturn((Object)TezConstants.getTezYarnServicePluginName());
        this.thh = (TaskHeartbeatHandler)Mockito.mock(TaskHeartbeatHandler.class);
        this.historyEventHandler = (HistoryEventHandler)Mockito.mock(HistoryEventHandler.class);
        TaskSchedulerManager taskScheduler = (TaskSchedulerManager)Mockito.mock(TaskSchedulerManager.class);
        try {
            ugi = UserGroupInformation.getCurrentUser();
        }
        catch (IOException e) {
            throw new RuntimeException(e);
        }
        DAG dag = (DAG)Mockito.mock(DAG.class);
        ((DAG)Mockito.doReturn((Object)ugi).when((Object)dag)).getDagUGI();
        ((DAG)Mockito.doReturn((Object)dagName).when((Object)dag)).getName();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.appContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.appContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)dag).when((Object)this.appContext)).getCurrentDAG();
        this.execService = (ListeningExecutorService)Mockito.mock(ListeningExecutorService.class);
        final ListenableFuture mockFuture = (ListenableFuture)Mockito.mock(ListenableFuture.class);
        ((ListeningExecutorService)Mockito.doAnswer((Answer)new Answer(){

            public ListenableFuture<Void> answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                CallableEvent e = (CallableEvent)args[0];
                TestVertexImpl.this.dispatcher.getEventHandler().handle((Event)e);
                return mockFuture;
            }
        }).when((Object)this.execService)).submit((Callable)Matchers.any());
        MockClock clock = new MockClock();
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.appContext)).getExecService();
        ((AppContext)Mockito.doReturn((Object)this.conf).when((Object)this.appContext)).getAMConf();
        ((DAG)Mockito.doReturn((Object)new Credentials()).when((Object)dag)).getCredentials();
        ((DAG)Mockito.doReturn((Object)DAGProtos.DAGPlan.getDefaultInstance()).when((Object)dag)).getJobPlan();
        ((AppContext)Mockito.doReturn((Object)this.dagId).when((Object)this.appContext)).getCurrentDAGID();
        ((DAG)Mockito.doReturn((Object)this.dagId).when((Object)dag)).getID();
        ((AppContext)Mockito.doReturn((Object)taskScheduler).when((Object)this.appContext)).getTaskScheduler();
        ((TaskSchedulerManager)Mockito.doReturn((Object)Resource.newInstance((int)102400, (int)60)).when((Object)taskScheduler)).getTotalResources(0);
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.appContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.dispatcher.getEventHandler()).when((Object)this.appContext)).getEventHandler();
        ((AppContext)Mockito.doReturn((Object)clock).when((Object)this.appContext)).getClock();
        this.vertexGroups = Maps.newHashMap();
        for (DAGProtos.PlanVertexGroupInfo groupInfo : this.dagPlan.getVertexGroupsList()) {
            this.vertexGroups.put(groupInfo.getGroupName(), new DAGImpl.VertexGroupInfo(groupInfo));
        }
        if (this.updateTracker != null) {
            this.updateTracker.stop();
        }
        this.updateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest(this.appContext.getCurrentDAG());
        this.setupVertices();
        Mockito.when((Object)dag.getVertex((TezVertexID)Matchers.any(TezVertexID.class))).thenAnswer((Answer)new Answer<Vertex>(){

            public Vertex answer(InvocationOnMock invocation) throws Throwable {
                Object[] args = invocation.getArguments();
                if (args.length != 1) {
                    return null;
                }
                TezVertexID vId = (TezVertexID)args[0];
                return (Vertex)TestVertexImpl.this.vertexIdMap.get(vId);
            }
        });
        Mockito.when((Object)dag.getVertex((String)Matchers.any(String.class))).thenAnswer((Answer)new Answer<Vertex>(){

            public Vertex answer(InvocationOnMock invocation) throws Throwable {
                Object[] args = invocation.getArguments();
                if (args.length != 1) {
                    return null;
                }
                String vId = (String)args[0];
                return (Vertex)TestVertexImpl.this.vertices.get(vId);
            }
        });
        this.edges = new HashMap<String, Edge>();
        for (DAGProtos.EdgePlan edgePlan : this.dagPlan.getEdgeList()) {
            EdgeProperty edgeProperty = DagTypeConverters.createEdgePropertyMapFromDAGPlan((DAGProtos.EdgePlan)edgePlan);
            this.edges.put(edgePlan.getId(), new Edge(edgeProperty, this.dispatcher.getEventHandler(), this.conf));
        }
        this.parseVertexEdges();
        for (Edge edge : this.edges.values()) {
            edge.initialize();
        }
        this.dispatcher.register(CallableEventType.class, (EventHandler)new CallableEventDispatcher());
        this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)this.taskAttemptEventDispatcher);
        this.taskEventDispatcher = new TaskEventDispatcher();
        this.dispatcher.register(TaskEventType.class, (EventHandler)this.taskEventDispatcher);
        this.vertexEventDispatcher = new VertexEventDispatcher();
        this.dispatcher.register(VertexEventType.class, (EventHandler)this.vertexEventDispatcher);
        this.dagEventDispatcher = new DagEventDispatcher();
        this.dispatcher.register(DAGEventType.class, (EventHandler)this.dagEventDispatcher);
        this.dispatcher.init(this.conf);
        this.dispatcher.start();
    }

    @BeforeClass
    public static void beforeClass() {
        MockDNSToSwitchMapping.initializeMockRackResolver();
    }

    @Before
    public void setup() throws TezException {
        this.useCustomInitializer = false;
        this.customInitializer = null;
        this.setupPreDagCreation();
        this.dagPlan = this.createTestDAGPlan();
        this.invalidDagPlan = this.createInvalidDAGPlan();
        this.setupPostDagCreation();
    }

    @After
    public void teardown() {
        if (this.dispatcher.isInState(Service.STATE.STARTED)) {
            this.dispatcher.await();
            this.dispatcher.stop();
        }
        this.updateTracker.stop();
        this.execService.shutdownNow();
        this.dispatcher = null;
        this.vertexEventDispatcher = null;
        this.dagEventDispatcher = null;
        this.dagPlan = null;
        this.invalidDagPlan = null;
        this.vertices = null;
        this.edges = null;
        this.vertexIdMap = null;
    }

    private void initAllVertices(VertexState expectedState) {
        VertexImpl v;
        int i;
        for (i = 1; i <= this.vertices.size(); ++i) {
            v = this.vertices.get("vertex" + i);
            if (v.sourceVertices != null && !v.sourceVertices.isEmpty()) continue;
            this.initVertex(v);
        }
        for (i = 1; i <= this.vertices.size(); ++i) {
            v = this.vertices.get("vertex" + i);
            Assert.assertEquals((Object)expectedState, (Object)v.getState());
        }
    }

    private void initVertex(VertexImpl v) {
        Assert.assertEquals((Object)VertexState.NEW, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
    }

    private void startVertex(VertexImpl v) {
        this.startVertex(v, true);
    }

    private void killVertex(VertexImpl v) {
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        Assert.assertEquals((Object)v.getTerminationCause(), (Object)VertexTerminationCause.DAG_TERMINATED);
    }

    private void startVertex(VertexImpl v, boolean checkRunningState) {
        Assert.assertEquals((Object)VertexState.INITED, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        if (checkRunningState) {
            Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        }
    }

    @Test(timeout=5000L)
    public void testVertexInit() throws AMUserCodeException {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v3 = this.vertices.get("vertex3");
        Assert.assertEquals((Object)"x3.y3", (Object)v3.getProcessorName());
        Assert.assertTrue((boolean)v3.getJavaOpts().contains("foo"));
        Assert.assertEquals((long)2L, (long)v3.getInputSpecList(0).size());
        Assert.assertEquals((long)2L, (long)v3.getInputVerticesCount());
        Assert.assertEquals((long)2L, (long)v3.getOutputVerticesCount());
        Assert.assertEquals((long)2L, (long)v3.getOutputVerticesCount());
        Assert.assertTrue(("vertex1".equals(((InputSpec)v3.getInputSpecList(0).get(0)).getSourceVertexName()) || "vertex2".equals(((InputSpec)v3.getInputSpecList(0).get(0)).getSourceVertexName()) ? 1 : 0) != 0);
        Assert.assertTrue(("vertex1".equals(((InputSpec)v3.getInputSpecList(0).get(1)).getSourceVertexName()) || "vertex2".equals(((InputSpec)v3.getInputSpecList(0).get(1)).getSourceVertexName()) ? 1 : 0) != 0);
        Assert.assertTrue(("i3_v1".equals(((InputSpec)v3.getInputSpecList(0).get(0)).getInputDescriptor().getClassName()) || "i3_v2".equals(((InputSpec)v3.getInputSpecList(0).get(0)).getInputDescriptor().getClassName()) ? 1 : 0) != 0);
        Assert.assertTrue(("i3_v1".equals(((InputSpec)v3.getInputSpecList(0).get(1)).getInputDescriptor().getClassName()) || "i3_v2".equals(((InputSpec)v3.getInputSpecList(0).get(1)).getInputDescriptor().getClassName()) ? 1 : 0) != 0);
        Assert.assertTrue(("vertex4".equals(((OutputSpec)v3.getOutputSpecList(0).get(0)).getDestinationVertexName()) || "vertex5".equals(((OutputSpec)v3.getOutputSpecList(0).get(0)).getDestinationVertexName()) ? 1 : 0) != 0);
        Assert.assertTrue(("vertex4".equals(((OutputSpec)v3.getOutputSpecList(0).get(1)).getDestinationVertexName()) || "vertex5".equals(((OutputSpec)v3.getOutputSpecList(0).get(1)).getDestinationVertexName()) ? 1 : 0) != 0);
        Assert.assertTrue(("o3_v4".equals(((OutputSpec)v3.getOutputSpecList(0).get(0)).getOutputDescriptor().getClassName()) || "o3_v5".equals(((OutputSpec)v3.getOutputSpecList(0).get(0)).getOutputDescriptor().getClassName()) ? 1 : 0) != 0);
        Assert.assertTrue(("o3_v4".equals(((OutputSpec)v3.getOutputSpecList(0).get(1)).getOutputDescriptor().getClassName()) || "o3_v5".equals(((OutputSpec)v3.getOutputSpecList(0).get(1)).getOutputDescriptor().getClassName()) ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void testNonExistVertexManager() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithNonExistVertexManager();
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.INIT_FAILURE, (Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)StringUtils.join((Collection)v1.getDiagnostics(), (String)"").contains("java.lang.ClassNotFoundException: non-exist-vertexmanager"));
    }

    @Test(timeout=5000L)
    public void testNonExistInputInitializer() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithNonExistInputInitializer();
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.INIT_FAILURE, (Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)StringUtils.join((Collection)v1.getDiagnostics(), (String)"").contains("java.lang.ClassNotFoundException: non-exist-input-initializer"));
    }

    @Test(timeout=5000L)
    public void testNonExistOutputCommitter() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithNonExistOutputCommitter();
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        v1.handle(new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.INIT_FAILURE, (Object)v1.getTerminationCause());
        Assert.assertTrue((boolean)StringUtils.join((Collection)v1.getDiagnostics(), (String)"").contains("java.lang.ClassNotFoundException: non-exist-output-committer"));
    }

    @Test(timeout=5000L)
    public void testVertexConfigureEvent() throws Exception {
        this.initAllVertices(VertexState.INITED);
        TestUpdateListener listener = new TestUpdateListener();
        this.updateTracker.registerForVertexUpdates("vertex3", EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED), listener);
        Assert.assertEquals((long)1L, (long)listener.events.size());
        Assert.assertEquals((Object)"vertex3", (Object)listener.events.get(0).getVertexName());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.CONFIGURED, (Object)listener.events.get(0).getVertexState());
        this.updateTracker.unregisterForVertexUpdates("vertex3", listener);
    }

    @Test(timeout=5000L)
    public void testVertexConfigureEventWithReconfigure() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithVMException("TestVMStateUpdate", VertexManagerWithException.VMExceptionLocation.NoExceptionDoReconfigure);
        this.setupPostDagCreation();
        TestUpdateListener listener = new TestUpdateListener();
        this.updateTracker.registerForVertexUpdates("vertex2", EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED), listener);
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        VertexImpl v2 = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v2.getState());
        Assert.assertEquals((long)0L, (long)listener.events.size());
        this.startVertex(v1, true);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((long)1L, (long)listener.events.size());
        Assert.assertEquals((Object)"vertex2", (Object)listener.events.get(0).getVertexName());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.CONFIGURED, (Object)listener.events.get(0).getVertexState());
        this.updateTracker.unregisterForVertexUpdates("vertex2", listener);
    }

    @Test(timeout=5000L)
    public void testVertexConfigureEventBadReconfigure() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v3 = this.vertices.get("vertex3");
        VertexImpl v2 = this.vertices.get("vertex2");
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(v2);
        this.startVertex(v1);
        try {
            v3.doneReconfiguringVertex();
            Assert.fail();
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("invoked only after vertexReconfigurationPlanned"));
        }
    }

    @Test(timeout=5000L)
    public void testVertexConfigureEventBadSetParallelism() throws Exception {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v3 = this.vertices.get("vertex3");
        VertexImpl v2 = this.vertices.get("vertex2");
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(v2);
        this.startVertex(v1);
        try {
            v3.reconfigureVertex(1, null, null);
            Assert.fail();
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("context.vertexReconfigurationPlanned() before re-configuring"));
        }
    }

    @Test(timeout=5000L)
    public void testVertexStart() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
    }

    @Test(timeout=5000L)
    public void testVertexGetTAAttempts() throws Exception {
        int i;
        int i2;
        this.initAllVertices(VertexState.INITED);
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(v1);
        VertexImpl v2 = this.vertices.get("vertex2");
        this.startVertex(v2);
        VertexImpl v3 = this.vertices.get("vertex3");
        VertexImpl v4 = this.vertices.get("vertex4");
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v4.getState());
        Assert.assertEquals((long)1L, (long)v4.sourceVertices.size());
        Edge e = (Edge)v4.sourceVertices.get(v3);
        TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v3.getVertexId(), (int)0), (int)0);
        TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v4.getVertexId(), (int)0), (int)0);
        for (int i3 = 0; i3 < 5; ++i3) {
            v4.handle((VertexEvent)new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((int)0, null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
        }
        this.dispatcher.await();
        Assert.assertEquals((long)5L, (long)v4.pendingTaskEvents.size());
        LinkedList<VertexManagerPluginContext.ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
        for (i2 = 0; i2 < v4.getTotalTasks(); ++i2) {
            taskList.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i2, null));
        }
        v4.scheduleTasks(taskList);
        this.dispatcher.await();
        Assert.assertEquals((long)5L, (long)v4.getOnDemandRouteEvents().size());
        for (i2 = 5; i2 < 11; ++i2) {
            v4.handle((VertexEvent)new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((int)0, null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
        }
        this.dispatcher.await();
        Assert.assertEquals((long)11L, (long)v4.getOnDemandRouteEvents().size());
        EdgeManagerPluginOnDemand mockPlugin = (EdgeManagerPluginOnDemand)Mockito.mock(EdgeManagerPluginOnDemand.class);
        EdgeManagerPluginOnDemand.EventRouteMetadata mockRoute = EdgeManagerPluginOnDemand.EventRouteMetadata.create((int)1, (int[])new int[]{0});
        e.edgeManager = mockPlugin;
        Mockito.when((Object)mockPlugin.routeDataMovementEventToDestination(1, 0, 0)).thenReturn((Object)mockRoute);
        TaskAttemptEventInfo eventInfo = v4.getTaskAttemptTezEvents(v4TaId, 0, 0, 1);
        Assert.assertEquals((long)11L, (long)eventInfo.getNextFromEventId());
        Assert.assertEquals((long)0L, (long)eventInfo.getEvents().size());
        int fromEventId = 0;
        Mockito.when((Object)mockPlugin.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn((Object)mockRoute);
        for (i = 0; i < 11; ++i) {
            eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 1);
            fromEventId = eventInfo.getNextFromEventId();
            Assert.assertEquals((long)(i + 1), (long)fromEventId);
            Assert.assertEquals((long)1L, (long)eventInfo.getEvents().size());
        }
        eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 1);
        Assert.assertEquals((long)11L, (long)eventInfo.getNextFromEventId());
        Assert.assertEquals((long)0L, (long)eventInfo.getEvents().size());
        fromEventId = 0;
        eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
        fromEventId = eventInfo.getNextFromEventId();
        Assert.assertEquals((long)11L, (long)fromEventId);
        Assert.assertEquals((long)11L, (long)eventInfo.getEvents().size());
        fromEventId = 0;
        for (i = 1; i <= 2; ++i) {
            eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
            fromEventId = eventInfo.getNextFromEventId();
            Assert.assertEquals((long)(i * 5), (long)fromEventId);
            Assert.assertEquals((long)5L, (long)eventInfo.getEvents().size());
        }
        eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
        Assert.assertEquals((long)11L, (long)eventInfo.getNextFromEventId());
        Assert.assertEquals((long)1L, (long)eventInfo.getEvents().size());
        mockRoute = EdgeManagerPluginOnDemand.EventRouteMetadata.create((int)2, (int[])new int[]{0, 0});
        Mockito.when((Object)mockPlugin.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn((Object)mockRoute);
        fromEventId = 0;
        int lastFromEventId = 0;
        for (int i4 = 1; i4 <= 4; ++i4) {
            eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
            fromEventId = eventInfo.getNextFromEventId();
            Assert.assertEquals((long)(i4 % 2 > 0 ? (lastFromEventId += 2) : (lastFromEventId += 3)), (long)fromEventId);
            Assert.assertEquals((long)5L, (long)eventInfo.getEvents().size());
        }
        eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
        Assert.assertEquals((long)11L, (long)eventInfo.getNextFromEventId());
        Assert.assertEquals((long)2L, (long)eventInfo.getEvents().size());
    }

    @Test(timeout=5000L)
    public void testVertexGetTAAttemptsObsoletion() throws Exception {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(v1);
        VertexImpl v2 = this.vertices.get("vertex2");
        this.startVertex(v2);
        VertexImpl v3 = this.vertices.get("vertex3");
        VertexImpl v4 = this.vertices.get("vertex4");
        LinkedList<VertexManagerPluginContext.ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
        for (int i = 0; i < v4.getTotalTasks(); ++i) {
            taskList.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i, null));
        }
        v4.scheduleTasks(taskList);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v4.getState());
        Assert.assertEquals((long)1L, (long)v4.sourceVertices.size());
        Edge e = (Edge)v4.sourceVertices.get(v3);
        TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v3.getVertexId(), (int)0), (int)0);
        TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v4.getVertexId(), (int)0), (int)0);
        for (int i = 0; i < 11; ++i) {
            v4.handle((VertexEvent)new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((int)0, null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
        }
        this.dispatcher.await();
        Assert.assertEquals((long)11L, (long)v4.getOnDemandRouteEvents().size());
        EdgeManagerPluginOnDemand mockPlugin = (EdgeManagerPluginOnDemand)Mockito.mock(EdgeManagerPluginOnDemand.class);
        EdgeManagerPluginOnDemand.EventRouteMetadata mockRoute = EdgeManagerPluginOnDemand.EventRouteMetadata.create((int)1, (int[])new int[]{0});
        e.edgeManager = mockPlugin;
        Mockito.when((Object)mockPlugin.routeInputSourceTaskFailedEventToDestination(Matchers.anyInt(), Matchers.anyInt())).thenReturn((Object)mockRoute);
        Mockito.when((Object)mockPlugin.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn((Object)mockRoute);
        v4.handle((VertexEvent)new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(new TezEvent((org.apache.tez.runtime.api.Event)InputFailedEvent.create((int)0, (int)0), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
        int fromEventId = 0;
        TaskAttemptEventInfo eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 100);
        fromEventId = eventInfo.getNextFromEventId();
        Assert.assertEquals((long)12L, (long)fromEventId);
        Assert.assertEquals((long)1L, (long)eventInfo.getEvents().size());
        Assert.assertEquals((Object)EventType.INPUT_FAILED_EVENT, (Object)((TezEvent)eventInfo.getEvents().get(0)).getEventType());
    }

    @Test(timeout=5000L)
    public void testVertexGetTAAttemptsObsoletionWithPendingRoutes() throws Exception {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(v1);
        VertexImpl v2 = this.vertices.get("vertex2");
        this.startVertex(v2);
        VertexImpl v3 = this.vertices.get("vertex3");
        VertexImpl v4 = this.vertices.get("vertex4");
        LinkedList<VertexManagerPluginContext.ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
        for (int i = 0; i < v4.getTotalTasks(); ++i) {
            taskList.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i, null));
        }
        v4.scheduleTasks(taskList);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v4.getState());
        Assert.assertEquals((long)1L, (long)v4.sourceVertices.size());
        Edge e = (Edge)v4.sourceVertices.get(v3);
        TezTaskAttemptID v3TaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v3.getVertexId(), (int)0), (int)0);
        TezTaskAttemptID v4TaId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v4.getVertexId(), (int)0), (int)0);
        for (int i = 0; i < 11; ++i) {
            v4.handle((VertexEvent)new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((int)0, null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
        }
        this.dispatcher.await();
        Assert.assertEquals((long)11L, (long)v4.getOnDemandRouteEvents().size());
        EdgeManagerPluginOnDemand mockPlugin = (EdgeManagerPluginOnDemand)Mockito.mock(EdgeManagerPluginOnDemand.class);
        EdgeManagerPluginOnDemand.EventRouteMetadata mockFailedRoute = EdgeManagerPluginOnDemand.EventRouteMetadata.create((int)1, (int[])new int[]{0});
        e.edgeManager = mockPlugin;
        Mockito.when((Object)mockPlugin.routeInputSourceTaskFailedEventToDestination(Matchers.anyInt(), Matchers.anyInt())).thenReturn((Object)mockFailedRoute);
        EdgeManagerPluginOnDemand.EventRouteMetadata mockRoute = EdgeManagerPluginOnDemand.EventRouteMetadata.create((int)2, (int[])new int[]{0, 0});
        Mockito.when((Object)mockPlugin.routeDataMovementEventToDestination(Matchers.anyInt(), Matchers.anyInt(), Matchers.anyInt())).thenReturn((Object)mockRoute);
        int fromEventId = 0;
        TaskAttemptEventInfo eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
        fromEventId = eventInfo.getNextFromEventId();
        Assert.assertEquals((long)2L, (long)fromEventId);
        Assert.assertEquals((long)5L, (long)eventInfo.getEvents().size());
        v4.handle((VertexEvent)new VertexEventRouteEvent(v4.getVertexId(), Collections.singletonList(new TezEvent((org.apache.tez.runtime.api.Event)InputFailedEvent.create((int)0, (int)0), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, v3.getName(), v3.getName(), v3TaId)))));
        eventInfo = v4.getTaskAttemptTezEvents(v4TaId, fromEventId, 0, 5);
        fromEventId = eventInfo.getNextFromEventId();
        Assert.assertEquals((long)12L, (long)fromEventId);
        Assert.assertEquals((long)1L, (long)eventInfo.getEvents().size());
        Assert.assertEquals((Object)EventType.INPUT_FAILED_EVENT, (Object)((TezEvent)eventInfo.getEvents().get(0)).getEventType());
    }

    @Test(timeout=5000L)
    public void testVertexReconfigurePlannedAfterInit() throws Exception {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v3 = this.vertices.get("vertex3");
        try {
            v3.vertexReconfigurationPlanned();
            Assert.fail();
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("context.vertexReconfigurationPlanned() cannot be called after initialize()"));
        }
    }

    private void checkTasks(Vertex v, int numTasks) {
        Assert.assertEquals((long)numTasks, (long)v.getTotalTasks());
        Map tasks = v.getTasks();
        Assert.assertEquals((long)numTasks, (long)tasks.size());
        int i = 0;
        for (Task task : tasks.values()) {
            Assert.assertEquals((long)i, (long)task.getTaskId().getId());
            ++i;
        }
    }

    @Test(timeout=5000L)
    public void testVertexSetParallelismDecrease() throws Exception {
        VertexImpl v3 = this.vertices.get("vertex3");
        v3.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals((long)2L, (long)v3.getTotalTasks());
        Assert.assertEquals((long)2L, (long)v3.getTasks().size());
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(this.vertices.get("vertex2"));
        this.startVertex(v1);
        EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        EdgeProperty edgeProp = EdgeProperty.create((EdgeManagerPluginDescriptor)mockEdgeManagerDescriptor, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"));
        Map<String, EdgeProperty> edgeManagerDescriptors = Collections.singletonMap(v1.getName(), edgeProp);
        v3.reconfigureVertex(1, null, edgeManagerDescriptors);
        v3.doneReconfiguringVertex();
        Assert.assertTrue((boolean)(((Edge)v3.sourceVertices.get(v1)).getEdgeManager() instanceof EdgeManagerForTest));
        this.checkTasks((Vertex)v3, 1);
    }

    @Test(timeout=5000L)
    public void testVertexSetParallelismIncrease() throws Exception {
        VertexImpl v3 = this.vertices.get("vertex3");
        v3.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals((long)2L, (long)v3.getTotalTasks());
        Assert.assertEquals((long)2L, (long)v3.getTasks().size());
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(this.vertices.get("vertex2"));
        this.startVertex(v1);
        EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        EdgeProperty edgeProp = EdgeProperty.create((EdgeManagerPluginDescriptor)mockEdgeManagerDescriptor, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"));
        Map<String, EdgeProperty> edgeManagerDescriptors = Collections.singletonMap(v1.getName(), edgeProp);
        v3.reconfigureVertex(10, null, edgeManagerDescriptors);
        v3.doneReconfiguringVertex();
        Assert.assertTrue((boolean)(((Edge)v3.sourceVertices.get(v1)).getEdgeManager() instanceof EdgeManagerForTest));
        this.checkTasks((Vertex)v3, 10);
    }

    @Test(timeout=5000L)
    public void testVertexSetParallelismMultiple() throws Exception {
        VertexImpl v3 = this.vertices.get("vertex3");
        v3.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals((long)2L, (long)v3.getTotalTasks());
        Map tasks = v3.getTasks();
        Assert.assertEquals((long)2L, (long)tasks.size());
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(this.vertices.get("vertex2"));
        this.startVertex(v1);
        v3.reconfigureVertex(10, null, null);
        this.checkTasks((Vertex)v3, 10);
        v3.reconfigureVertex(5, null, null);
        this.checkTasks((Vertex)v3, 5);
        v3.doneReconfiguringVertex();
    }

    @Test(timeout=5000L)
    public void testVertexSetParallelismMultipleFailAfterDone() throws Exception {
        VertexImpl v3 = this.vertices.get("vertex3");
        v3.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals((long)2L, (long)v3.getTotalTasks());
        Map tasks = v3.getTasks();
        Assert.assertEquals((long)2L, (long)tasks.size());
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(this.vertices.get("vertex2"));
        this.startVertex(v1);
        v3.reconfigureVertex(10, null, null);
        this.checkTasks((Vertex)v3, 10);
        v3.doneReconfiguringVertex();
        try {
            v3.reconfigureVertex(5, null, null);
            Assert.fail();
        }
        catch (IllegalStateException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("Vertex is fully configured but still"));
        }
    }

    @Test(timeout=5000L)
    public void testVertexSetParallelismMultipleFailAfterSchedule() throws Exception {
        VertexImpl v3 = this.vertices.get("vertex3");
        v3.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals((long)2L, (long)v3.getTotalTasks());
        Map tasks = v3.getTasks();
        Assert.assertEquals((long)2L, (long)tasks.size());
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(this.vertices.get("vertex2"));
        this.startVertex(v1);
        v3.reconfigureVertex(10, null, null);
        this.checkTasks((Vertex)v3, 10);
        v3.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create((int)0, null)));
        try {
            v3.reconfigureVertex(5, null, null);
            Assert.fail();
        }
        catch (TezUncheckedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("setParallelism cannot be called after scheduling"));
        }
    }

    @Test(timeout=5000L)
    public void testVertexScheduleSendEvent() throws Exception {
        VertexImpl v3 = this.vertices.get("vertex3");
        v3.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals((long)2L, (long)v3.getTotalTasks());
        Map tasks = v3.getTasks();
        Assert.assertEquals((long)2L, (long)tasks.size());
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(this.vertices.get("vertex2"));
        this.startVertex(v1);
        v3.reconfigureVertex(10, null, null);
        this.checkTasks((Vertex)v3, 10);
        this.taskEventDispatcher.events.clear();
        TaskLocationHint mockLocation = (TaskLocationHint)Mockito.mock(TaskLocationHint.class);
        v3.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create((int)0, (TaskLocationHint)mockLocation)));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)this.taskEventDispatcher.events.size());
        TaskEventScheduleTask event = (TaskEventScheduleTask)this.taskEventDispatcher.events.get(0);
        Assert.assertEquals((Object)mockLocation, (Object)event.getTaskLocationHint());
        Assert.assertNotNull((Object)event.getBaseTaskSpec());
        Assert.assertEquals((Object)"foobar", (Object)event.getBaseTaskSpec().getTaskConf().get("abc"));
    }

    @Test(timeout=5000L)
    public void testVertexSetParallelismFailAfterSchedule() throws Exception {
        VertexImpl v3 = this.vertices.get("vertex3");
        v3.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals((long)2L, (long)v3.getTotalTasks());
        Map tasks = v3.getTasks();
        Assert.assertEquals((long)2L, (long)tasks.size());
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(this.vertices.get("vertex2"));
        this.startVertex(v1);
        v3.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create((int)0, null)));
        try {
            v3.reconfigureVertex(5, null, null);
            Assert.fail();
        }
        catch (TezUncheckedException e) {
            Assert.assertTrue((boolean)e.getMessage().contains("setParallelism cannot be called after scheduling"));
        }
    }

    @Test(timeout=5000L)
    public void testVertexPendingTaskEvents() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v3 = this.vertices.get("vertex3");
        VertexImpl v2 = this.vertices.get("vertex2");
        VertexImpl v1 = this.vertices.get("vertex1");
        this.startVertex(v1);
        TezTaskID t0_v2 = TezTaskID.getInstance((TezVertexID)v2.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v2 = TezTaskAttemptID.getInstance((TezTaskID)t0_v2, (int)0);
        LinkedList taskEvents = Lists.newLinkedList();
        TezEvent tezEvent1 = new TezEvent((org.apache.tez.runtime.api.Event)CompositeDataMovementEvent.create((int)0, (int)1, (ByteBuffer)ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
        TezEvent tezEvent2 = new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((int)0, (ByteBuffer)ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", ta0_t0_v2));
        taskEvents.add(tezEvent1);
        taskEvents.add(tezEvent2);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v3.getVertexId(), (List)taskEvents));
        this.dispatcher.await();
        Assert.assertEquals((long)2L, (long)v3.pendingTaskEvents.size());
        v3.scheduleTasks(Collections.singletonList(VertexManagerPluginContext.ScheduleTaskRequest.create((int)0, null)));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)v3.pendingTaskEvents.size());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v3.getVertexId(), (List)taskEvents));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)v3.pendingTaskEvents.size());
    }

    @Test(timeout=5000L)
    public void testSetCustomEdgeManager() throws Exception {
        VertexImpl v5 = this.vertices.get("vertex5");
        v5.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        Edge edge = this.edges.get("e4");
        EdgeManagerPlugin em = edge.getEdgeManager();
        EdgeManagerForTest originalEm = (EdgeManagerForTest)em;
        Assert.assertTrue((boolean)Arrays.equals(this.edgePayload, originalEm.getEdgeManagerContext().getUserPayload().deepCopyAsArray()));
        UserPayload userPayload = UserPayload.create((ByteBuffer)ByteBuffer.wrap(new String("foo").getBytes()));
        EdgeManagerPluginDescriptor edgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        edgeManagerDescriptor.setUserPayload(userPayload);
        EdgeProperty edgeProp = EdgeProperty.create((EdgeManagerPluginDescriptor)edgeManagerDescriptor, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"));
        Vertex v3 = (Vertex)this.vertices.get("vertex3");
        Map<String, EdgeProperty> edgeManagerDescriptors = Collections.singletonMap(v3.getName(), edgeProp);
        v5.reconfigureVertex(v5.getTotalTasks() - 1, null, edgeManagerDescriptors);
        v5.doneReconfiguringVertex();
        VertexImpl v5Impl = v5;
        EdgeManagerPlugin modifiedEdgeManager = ((Edge)v5Impl.sourceVertices.get(v3)).getEdgeManager();
        Assert.assertNotNull((Object)modifiedEdgeManager);
        Assert.assertTrue((boolean)(modifiedEdgeManager instanceof EdgeManagerForTest));
        Assert.assertTrue((boolean)Arrays.equals(userPayload.deepCopyAsArray(), ((EdgeManagerForTest)modifiedEdgeManager).getUserPayload().deepCopyAsArray()));
    }

    @Test(timeout=5000L)
    public void testBasicVertexCompletion() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)v.getCompletedTasks());
        Assert.assertTrue((0.5f == v.getCompletedTaskProgress() ? 1 : 0) != 0);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)2L, (long)v.getCompletedTasks());
        Assert.assertTrue((1.0f == v.getCompletedTaskProgress() ? 1 : 0) != 0);
        Assert.assertTrue((v.initTimeRequested > 0L ? 1 : 0) != 0);
        Assert.assertTrue((v.initedTime > 0L ? 1 : 0) != 0);
        Assert.assertTrue((v.startTimeRequested > 0L ? 1 : 0) != 0);
        Assert.assertTrue((v.startedTime > 0L ? 1 : 0) != 0);
        Assert.assertTrue((v.finishTime > 0L ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    @Ignore
    public void testDuplicateTaskCompletion() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
    }

    @Test(timeout=5000L)
    public void testVertexFailure() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((Object)VertexTerminationCause.OWN_TASK_FAILURE, (Object)v.getTerminationCause());
        String diagnostics = StringUtils.join((Collection)v.getDiagnostics(), (String)",").toLowerCase(Locale.ENGLISH);
        Assert.assertTrue((boolean)diagnostics.contains("task failed, taskid=" + t1.toString()));
    }

    @Test(timeout=5000L)
    public void testVertexKillDiagnosticsInInit() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v2 = this.vertices.get("vertex4");
        this.killVertex(v2);
        String diagnostics = StringUtils.join((Collection)v2.getDiagnostics(), (String)",").toLowerCase(Locale.ENGLISH);
        LOG.info("diagnostics v2: " + diagnostics);
        Assert.assertTrue((boolean)diagnostics.contains("vertex received kill in inited state"));
    }

    @Test(timeout=5000L)
    public void testVertexKillDiagnosticsInRunning() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v3 = this.vertices.get("vertex3");
        this.startVertex(this.vertices.get("vertex1"));
        this.startVertex(this.vertices.get("vertex2"));
        this.killVertex(v3);
        String diagnostics = StringUtils.join((Collection)v3.getDiagnostics(), (String)",").toLowerCase(Locale.ENGLISH);
        Assert.assertTrue((boolean)diagnostics.contains("vertex received kill while in running state"));
        Assert.assertEquals((Object)VertexTerminationCause.DAG_TERMINATED, (Object)v3.getTerminationCause());
        Assert.assertTrue((boolean)diagnostics.contains(v3.getTerminationCause().name().toLowerCase(Locale.ENGLISH)));
    }

    @Test(timeout=5000L)
    public void testVertexKillPending() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1), TaskState.KILLED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
    }

    @Test(timeout=5000L)
    public void testVertexKill() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTermination(v.getVertexId(), VertexTerminationCause.DAG_TERMINATED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
    }

    @Test(timeout=5000L)
    public void testKilledTasksHandling() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((Object)VertexTerminationCause.OWN_TASK_FAILURE, (Object)v.getTerminationCause());
        Assert.assertEquals((Object)TaskState.KILLED, (Object)v.getTask(t2).getState());
    }

    @Test(timeout=5000L)
    public void testVertexCommitterInit() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v2 = this.vertices.get("vertex2");
        Assert.assertNull((Object)v2.getOutputCommitter("output"));
        VertexImpl v6 = this.vertices.get("vertex6");
        Assert.assertTrue((boolean)(v6.getOutputCommitter("outputx") instanceof CountingOutputCommitter));
    }

    @Test(timeout=5000L)
    public void testVertexManagerInit() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v2 = this.vertices.get("vertex2");
        Assert.assertTrue((boolean)(v2.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager));
        VertexImpl v6 = this.vertices.get("vertex6");
        Assert.assertTrue((boolean)(v6.getVertexManager().getPlugin() instanceof ShuffleVertexManager));
    }

    @Test(timeout=5000L)
    public void testVertexTaskFailure() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex6");
        this.startVertex(this.vertices.get("vertex1"));
        this.startVertex(this.vertices.get("vertex2"));
        CountingOutputCommitter committer = (CountingOutputCommitter)v.getOutputCommitter("outputx");
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.FAILED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((Object)VertexTerminationCause.OWN_TASK_FAILURE, (Object)v.getTerminationCause());
        Assert.assertEquals((long)0L, (long)committer.commitCounter);
        Assert.assertEquals((long)1L, (long)committer.abortCounter);
    }

    @Test(timeout=5000L)
    public void testVertexTaskAttemptProcessorFailure() throws Exception {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex1");
        this.startVertex(v);
        this.dispatcher.await();
        TaskAttemptImpl ta = (TaskAttemptImpl)v.getTask(0).getAttempts().values().iterator().next();
        ta.handle((TaskAttemptEvent)new TaskAttemptEventSchedule(ta.getID(), 2, 2));
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newInstance((ApplicationAttemptId)this.appAttemptId, (int)3);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        AMContainerMap containers = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface)Mockito.mock(TaskCommunicatorManagerInterface.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), this.appContext);
        containers.addContainerIfNew(container, 0, 0, 0);
        ((AppContext)Mockito.doReturn((Object)containers).when((Object)this.appContext)).getAllContainers();
        ta.handle((TaskAttemptEvent)new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
        Assert.assertEquals((Object)TaskAttemptStateInternal.RUNNING, (Object)ta.getInternalState());
        ta.handle((TaskAttemptEvent)new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.APPLICATION_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.APPLICATION_ERROR, (Object)ta.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testVertexTaskAttemptInputFailure() throws Exception {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex1");
        this.startVertex(v);
        this.dispatcher.await();
        TaskAttemptImpl ta = (TaskAttemptImpl)v.getTask(0).getAttempts().values().iterator().next();
        ta.handle((TaskAttemptEvent)new TaskAttemptEventSchedule(ta.getID(), 2, 2));
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newInstance((ApplicationAttemptId)this.appAttemptId, (int)3);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        AMContainerMap containers = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface)Mockito.mock(TaskCommunicatorManagerInterface.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), this.appContext);
        containers.addContainerIfNew(container, 0, 0, 0);
        ((AppContext)Mockito.doReturn((Object)containers).when((Object)this.appContext)).getAllContainers();
        ta.handle((TaskAttemptEvent)new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
        Assert.assertEquals((Object)TaskAttemptStateInternal.RUNNING, (Object)ta.getInternalState());
        ta.handle((TaskAttemptEvent)new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.INPUT_READ_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.INPUT_READ_ERROR, (Object)ta.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testVertexTaskAttemptOutputFailure() throws Exception {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex1");
        this.startVertex(v);
        this.dispatcher.await();
        TaskAttemptImpl ta = (TaskAttemptImpl)v.getTask(0).getAttempts().values().iterator().next();
        ta.handle((TaskAttemptEvent)new TaskAttemptEventSchedule(ta.getID(), 2, 2));
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newInstance((ApplicationAttemptId)this.appAttemptId, (int)3);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        AMContainerMap containers = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface)Mockito.mock(TaskCommunicatorManagerInterface.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), this.appContext);
        containers.addContainerIfNew(container, 0, 0, 0);
        ((AppContext)Mockito.doReturn((Object)containers).when((Object)this.appContext)).getAllContainers();
        ta.handle((TaskAttemptEvent)new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
        Assert.assertEquals((Object)TaskAttemptStateInternal.RUNNING, (Object)ta.getInternalState());
        ta.handle((TaskAttemptEvent)new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "diag", TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, (Object)ta.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testSourceVertexStartHandling() {
        LOG.info("Testing testSourceVertexStartHandling");
        this.initAllVertices(VertexState.INITED);
        VertexImpl v6 = this.vertices.get("vertex6");
        VertexImpl v3 = this.vertices.get("vertex3");
        this.startVertex(this.vertices.get("vertex1"));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v3.getState());
        long v3StartTimeRequested = v3.startTimeRequested;
        Assert.assertEquals((long)1L, (long)v3.numStartedSourceVertices);
        Assert.assertTrue((v3StartTimeRequested > 0L ? 1 : 0) != 0);
        try {
            Thread.sleep(100L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.startVertex(this.vertices.get("vertex2"));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
        Assert.assertEquals((long)2L, (long)v3.numStartedSourceVertices);
        Assert.assertTrue((v3.startTimeRequested > v3StartTimeRequested ? 1 : 0) != 0);
        LOG.info("Verifying v6 state " + v6.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v6.getState());
        Assert.assertEquals((long)3L, (long)v6.getDistanceFromRoot());
    }

    @Test(timeout=5000L)
    public void testSourceTaskAttemptCompletionEvents() {
        LOG.info("Testing testSourceTaskAttemptCompletionEvents");
        this.initAllVertices(VertexState.INITED);
        VertexImpl v4 = this.vertices.get("vertex4");
        VertexImpl v5 = this.vertices.get("vertex5");
        VertexImpl v6 = this.vertices.get("vertex6");
        this.startVertex(this.vertices.get("vertex1"));
        this.startVertex(this.vertices.get("vertex2"));
        this.dispatcher.await();
        LOG.info("Verifying v6 state " + v6.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v6.getState());
        TezTaskID t1_v4 = TezTaskID.getInstance((TezVertexID)v4.getVertexId(), (int)0);
        TezTaskID t2_v4 = TezTaskID.getInstance((TezVertexID)v4.getVertexId(), (int)1);
        TezTaskID t1_v5 = TezTaskID.getInstance((TezVertexID)v5.getVertexId(), (int)0);
        TezTaskID t2_v5 = TezTaskID.getInstance((TezVertexID)v5.getVertexId(), (int)1);
        TezTaskAttemptID ta1_t1_v4 = TezTaskAttemptID.getInstance((TezTaskID)t1_v4, (int)0);
        TezTaskAttemptID ta2_t1_v4 = TezTaskAttemptID.getInstance((TezTaskID)t1_v4, (int)0);
        TezTaskAttemptID ta1_t2_v4 = TezTaskAttemptID.getInstance((TezTaskID)t2_v4, (int)0);
        TezTaskAttemptID ta1_t1_v5 = TezTaskAttemptID.getInstance((TezTaskID)t1_v5, (int)0);
        TezTaskAttemptID ta1_t2_v5 = TezTaskAttemptID.getInstance((TezTaskID)t2_v5, (int)0);
        TezTaskAttemptID ta2_t2_v5 = TezTaskAttemptID.getInstance((TezTaskID)t2_v5, (int)0);
        v4.handle((VertexEvent)new VertexEventTaskAttemptCompleted(ta1_t1_v4, TaskAttemptStateInternal.FAILED));
        v4.handle((VertexEvent)new VertexEventTaskAttemptCompleted(ta2_t1_v4, TaskAttemptStateInternal.SUCCEEDED));
        v4.handle((VertexEvent)new VertexEventTaskAttemptCompleted(ta1_t2_v4, TaskAttemptStateInternal.SUCCEEDED));
        v5.handle((VertexEvent)new VertexEventTaskAttemptCompleted(ta1_t1_v5, TaskAttemptStateInternal.SUCCEEDED));
        v5.handle((VertexEvent)new VertexEventTaskAttemptCompleted(ta1_t2_v5, TaskAttemptStateInternal.FAILED));
        v5.handle((VertexEvent)new VertexEventTaskAttemptCompleted(ta2_t2_v5, TaskAttemptStateInternal.SUCCEEDED));
        v4.handle((VertexEvent)new VertexEventTaskCompleted(t1_v4, TaskState.SUCCEEDED));
        v4.handle((VertexEvent)new VertexEventTaskCompleted(t2_v4, TaskState.SUCCEEDED));
        v5.handle((VertexEvent)new VertexEventTaskCompleted(t1_v5, TaskState.SUCCEEDED));
        v5.handle((VertexEvent)new VertexEventTaskCompleted(t2_v5, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v4.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v5.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v6.getState());
        Assert.assertEquals((long)4L, (long)v6.numSuccessSourceAttemptCompletions);
    }

    @Test(timeout=5000L)
    public void testDAGEventGeneration() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
    }

    @Test(timeout=5000L)
    public void testTaskReschedule() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskReschedule(t1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
    }

    @Test(timeout=5000L)
    public void testVertexSuccessToRunningAfterTaskScheduler() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskReschedule(t1));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_RERUNNING).intValue());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)2L, (long)this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
    }

    @Test(timeout=5000L)
    public void testVertexSuccessToFailedAfterTaskScheduler() throws Exception {
        VertexImpl v = this.vertices.get("vertex2");
        ArrayList<DAGProtos.RootInputLeafOutputProto> outputs = new ArrayList<DAGProtos.RootInputLeafOutputProto>();
        outputs.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])new CountingOutputCommitter.CountingOutputCommitterConfig().toUserPayload())).build())).setName("output_v2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        v.setAdditionalOutputs(outputs);
        this.initAllVertices(VertexState.INITED);
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskReschedule(t1));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((long)2L, (long)this.dagEventDispatcher.eventCount.get(DAGEventType.DAG_VERTEX_COMPLETED).intValue());
    }

    @Test(timeout=5000L)
    public void testVertexCommit() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex6");
        this.startVertex(this.vertices.get("vertex1"));
        this.startVertex(this.vertices.get("vertex2"));
        CountingOutputCommitter committer = (CountingOutputCommitter)v.getOutputCommitter("outputx");
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)committer.commitCounter);
        Assert.assertEquals((long)0L, (long)committer.abortCounter);
        Assert.assertEquals((long)1L, (long)committer.initCounter);
        Assert.assertEquals((long)1L, (long)committer.setupCounter);
    }

    @Test(timeout=5000L)
    public void testTaskFailedAfterVertexSuccess() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex6");
        this.startVertex(this.vertices.get("vertex1"));
        this.startVertex(this.vertices.get("vertex2"));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        CountingOutputCommitter committer = (CountingOutputCommitter)v.getOutputCommitter("outputx");
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)committer.commitCounter);
        Assert.assertEquals((long)0L, (long)committer.abortCounter);
        Assert.assertEquals((long)1L, (long)committer.initCounter);
        Assert.assertEquals((long)1L, (long)committer.setupCounter);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)committer.commitCounter);
        Assert.assertEquals((long)1L, (long)committer.abortCounter);
    }

    @Test(timeout=5000L)
    public void testBadCommitter() throws Exception {
        VertexImpl v = this.vertices.get("vertex2");
        ArrayList<DAGProtos.RootInputLeafOutputProto> outputs = new ArrayList<DAGProtos.RootInputLeafOutputProto>();
        outputs.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])new CountingOutputCommitter.CountingOutputCommitterConfig(true, true, false).toUserPayload())).build())).setName("output_v2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        v.setAdditionalOutputs(outputs);
        this.initAllVertices(VertexState.INITED);
        this.startVertex(v);
        CountingOutputCommitter committer = (CountingOutputCommitter)v.getOutputCommitter("output_v2");
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((Object)VertexTerminationCause.COMMIT_FAILURE, (Object)v.getTerminationCause());
        Assert.assertEquals((long)1L, (long)committer.commitCounter);
        Assert.assertEquals((long)1L, (long)committer.abortCounter);
        Assert.assertEquals((long)1L, (long)committer.initCounter);
        Assert.assertEquals((long)1L, (long)committer.setupCounter);
    }

    @Test(timeout=5000L)
    public void testBadCommitter2() throws Exception {
        VertexImpl v = this.vertices.get("vertex2");
        ArrayList<DAGProtos.RootInputLeafOutputProto> outputs = new ArrayList<DAGProtos.RootInputLeafOutputProto>();
        outputs.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])new CountingOutputCommitter.CountingOutputCommitterConfig(true, true, true).toUserPayload())).build())).setName("output_v2").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        v.setAdditionalOutputs(outputs);
        this.initAllVertices(VertexState.INITED);
        this.startVertex(v);
        CountingOutputCommitter committer = (CountingOutputCommitter)v.getOutputCommitter("output_v2");
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((Object)VertexTerminationCause.COMMIT_FAILURE, (Object)v.getTerminationCause());
        Assert.assertEquals((long)1L, (long)committer.commitCounter);
        Assert.assertEquals((long)1L, (long)committer.abortCounter);
        Assert.assertEquals((long)1L, (long)committer.initCounter);
        Assert.assertEquals((long)1L, (long)committer.setupCounter);
    }

    @Test(timeout=5000L)
    public void testVertexInitWithCustomVertexManager() throws Exception {
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGWithCustomVertexManager();
        this.setupPostDagCreation();
        int numTasks = 3;
        VertexImpl v1 = this.vertices.get("v1");
        VertexImpl v2 = this.vertices.get("v2");
        VertexImpl v3 = this.vertices.get("v3");
        this.initVertex(v1);
        this.initVertex(v2);
        this.dispatcher.await();
        Assert.assertEquals((long)-1L, (long)v1.getTotalTasks());
        Assert.assertTrue((0.0f == v1.getCompletedTaskProgress() ? 1 : 0) != 0);
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((long)-1L, (long)v2.getTotalTasks());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v2.getState());
        Assert.assertEquals((long)-1L, (long)v3.getTotalTasks());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v3.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((long)-1L, (long)v1.getTotalTasks());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        v1.reconfigureVertex(numTasks, null, null);
        v2.reconfigureVertex(numTasks, null, null);
        this.dispatcher.await();
        Assert.assertEquals((long)numTasks, (long)v1.getTotalTasks());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((long)numTasks, (long)v2.getTotalTasks());
        Assert.assertEquals((Object)VertexState.INITED, (Object)v2.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v2.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v3.getState());
        Assert.assertTrue((v3.numStartedSourceVertices > 0 ? 1 : 0) != 0);
        long v3StartTimeRequested = v3.startTimeRequested;
        Assert.assertTrue((v3StartTimeRequested > 0L ? 1 : 0) != 0);
        v3.reconfigureVertex(numTasks, null, null);
        this.dispatcher.await();
        Assert.assertEquals((long)numTasks, (long)v3.getTotalTasks());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
        Assert.assertEquals((long)v3StartTimeRequested, (long)v3.startTimeRequested);
    }

    @Test(timeout=5000L)
    public void testVertexManagerHeuristic() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithMixedEdges();
        this.setupPostDagCreation();
        this.initAllVertices(VertexState.INITED);
        Assert.assertEquals(ImmediateStartVertexManager.class, this.vertices.get("vertex1").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(ShuffleVertexManager.class, this.vertices.get("vertex2").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(InputReadyVertexManager.class, this.vertices.get("vertex3").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(ImmediateStartVertexManager.class, this.vertices.get("vertex4").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(ImmediateStartVertexManager.class, this.vertices.get("vertex5").getVertexManager().getPlugin().getClass());
        Assert.assertEquals(InputReadyVertexManager.class, this.vertices.get("vertex6").getVertexManager().getPlugin().getClass());
    }

    @Test(timeout=5000L)
    public void testVertexWithOneToOneSplit() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanForOneToOneSplit("TestInputInitializer", -1, true);
        this.setupPostDagCreation();
        int numTasks = 5;
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        VertexImpl v5 = this.vertices.get("vertex5");
        this.initVertex(v1);
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        Edge e = (Edge)v5.sourceVertices.get((Object)v1);
        Assert.assertNull((Object)e.getEdgeManager());
        e.setCustomEdgeManager(mockEdgeManagerDescriptor);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)this.vertices.get("vertex5").getState());
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        List<TaskLocationHint> v1Hints = this.createTaskLocationHints(numTasks);
        initializerManager1.completeInputInitialization(0, numTasks, v1Hints);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        Assert.assertEquals((long)numTasks, (long)v1.getTotalTasks());
        Assert.assertEquals((Object)RootInputVertexManager.class.getName(), (Object)v1.getVertexManager().getPlugin().getClass().getName());
        for (int i = 0; i < v1Hints.size(); ++i) {
            Assert.assertEquals((Object)v1Hints.get(i), (Object)v1.getTaskLocationHints()[i]);
        }
        Assert.assertEquals((Object)true, (Object)initializerManager1.hasShutDown);
        this.startVertex(v1);
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)this.vertices.get("vertex4").getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)this.vertices.get("vertex6").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex1").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex2").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex3").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex5").getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)this.vertices.get("vertex4").getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)this.vertices.get("vertex6").getState());
        mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        e = (Edge)this.vertices.get((Object)"vertex6").sourceVertices.get(this.vertices.get("vertex4"));
        Assert.assertNull((Object)e.getEdgeManager());
        e.setCustomEdgeManager(mockEdgeManagerDescriptor);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex4").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex6").getState());
    }

    @Test(timeout=5000L)
    public void testVertexWithOneToOneSplitWhileRunning() throws Exception {
        int numTasks = 5;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanForOneToOneSplit(null, numTasks, false);
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        v1.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        v1.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)VertexManagerPluginForTest.class.getName()), UserGroupInformation.getCurrentUser(), (Vertex)v1, this.appContext, (StateChangeNotifier)Mockito.mock(StateChangeNotifier.class));
        v1.vertexManager.initialize();
        this.startVertex(v1);
        this.dispatcher.await();
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex1").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex2").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex3").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex4").getState());
        int newNumTasks = 3;
        v1.reconfigureVertex(newNumTasks, null, null);
        v1.doneReconfiguringVertex();
        this.dispatcher.await();
        Assert.assertEquals((long)newNumTasks, (long)this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals((long)newNumTasks, (long)this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals((long)newNumTasks, (long)this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex1").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex2").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex3").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex4").getState());
    }

    @Test(timeout=5000L)
    public void testVertexWithOneToOneSplitWhileInited() throws Exception {
        int numTasks = 5;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanForOneToOneSplit(null, numTasks, false);
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        v1.vertexReconfigurationPlanned();
        this.initAllVertices(VertexState.INITED);
        v1.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)VertexManagerPluginForTest.class.getName()), UserGroupInformation.getCurrentUser(), (Vertex)v1, this.appContext, (StateChangeNotifier)Mockito.mock(StateChangeNotifier.class));
        v1.vertexManager.initialize();
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals((long)numTasks, (long)this.vertices.get("vertex4").getTotalTasks());
        int newNumTasks = 3;
        v1.reconfigureVertex(newNumTasks, null, null);
        v1.doneReconfiguringVertex();
        this.dispatcher.await();
        Assert.assertEquals((long)newNumTasks, (long)this.vertices.get("vertex2").getTotalTasks());
        Assert.assertEquals((long)newNumTasks, (long)this.vertices.get("vertex3").getTotalTasks());
        Assert.assertEquals((long)newNumTasks, (long)this.vertices.get("vertex4").getTotalTasks());
        Assert.assertEquals((Object)VertexState.INITED, (Object)this.vertices.get("vertex1").getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)this.vertices.get("vertex2").getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)this.vertices.get("vertex3").getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)this.vertices.get("vertex4").getState());
        this.startVertex(v1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex1").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex2").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex3").getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)this.vertices.get("vertex4").getState());
    }

    @Test(timeout=5000L)
    public void testVertexVMErrorReport() throws Exception {
        int numTasks = 5;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanForOneToOneSplit(null, numTasks, false);
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        this.initAllVertices(VertexState.INITED);
        VertexManagerPluginForTest.VertexManagerPluginForTestConfig config = new VertexManagerPluginForTest.VertexManagerPluginForTestConfig();
        config.setReconfigureOnStart(true);
        v1.vertexManager = new VertexManager((VertexManagerPluginDescriptor)VertexManagerPluginDescriptor.create((String)VertexManagerPluginForTest.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)config.getPayload())), UserGroupInformation.getCurrentUser(), (Vertex)v1, this.appContext, (StateChangeNotifier)Mockito.mock(StateChangeNotifier.class));
        v1.vertexManager.initialize();
        this.startVertex(v1, false);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)this.vertices.get("vertex1").getState());
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)this.vertices.get("vertex1").getTerminationCause());
        Assert.assertTrue((boolean)Joiner.on((String)":").join((Iterable)this.vertices.get("vertex1").getDiagnostics()).contains("context.vertexReconfigurationPlanned() before re-configuring"));
    }

    @Test(timeout=5000L)
    public void testInvalidEvent() {
        VertexImpl v = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.ERROR, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dagEventDispatcher.eventCount.get(DAGEventType.INTERNAL_ERROR).intValue());
    }

    @Test(timeout=5000L)
    public void testVertexWithInitializerFailure() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithInputInitializer("TestInputInitializer");
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.failInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)RootInputVertexManager.class.getName(), (Object)v1.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals((Object)true, (Object)initializerManager1.hasShutDown);
        Assert.assertTrue((boolean)StringUtils.join((Collection)v1.getDiagnostics(), (String)",").contains(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE.name()));
        Assert.assertTrue((boolean)StringUtils.join((Collection)v1.getDiagnostics(), (String)",").contains("MockInitializerFailed"));
        VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex2");
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v2.getState());
        RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
        initializerManager2.failInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)RootInputVertexManager.class.getName(), (Object)v2.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals((Object)true, (Object)initializerManager2.hasShutDown);
        Assert.assertTrue((boolean)StringUtils.join((Collection)v1.getDiagnostics(), (String)",").contains(VertexTerminationCause.ROOT_INPUT_INIT_FAILURE.name()));
        Assert.assertTrue((boolean)StringUtils.join((Collection)v1.getDiagnostics(), (String)",").contains("MockInitializerFailed"));
    }

    @Test(timeout=10000L)
    public void testVertexWithInitializerParallelismSetTo0() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new RootInitializerSettingParallelismTo0(null);
        RootInitializerSettingParallelismTo0 initializer = (RootInitializerSettingParallelismTo0)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithInitializer0Tasks(RootInitializerSettingParallelismTo0.class.getName());
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        VertexImpl v2 = this.vertices.get("vertex2");
        this.initVertex(v2);
        TezTaskID v2t1 = TezTaskID.getInstance((TezVertexID)v2.getVertexId(), (int)0);
        TezTaskAttemptID ta1V2T1 = TezTaskAttemptID.getInstance((TezTaskID)v2t1, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create(null), new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex1", ta1V2T1));
        LinkedList<TezEvent> events = new LinkedList<TezEvent>();
        events.add(tezEvent);
        v1.handle((VertexEvent)new VertexEventRouteEvent(v1.getVertexId(), events));
        this.startVertex(v2);
        this.dispatcher.await();
        v2.handle((VertexEvent)new VertexEventTaskAttemptCompleted(ta1V2T1, TaskAttemptStateInternal.SUCCEEDED));
        v2.handle((VertexEvent)new VertexEventTaskCompleted(v2t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v2.getState());
        while (v1.getState() == VertexState.INITIALIZING || v1.getState() == VertexState.INITED) {
            initializer.go();
            Thread.sleep(10L);
        }
        while (v1.getState() != VertexState.SUCCEEDED) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
    }

    @Test(timeout=10000L)
    public void testInputInitializerVertexStateUpdates() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        initializer.setNumVertexStateUpdateEvents(3);
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        this.initVertex(v1);
        this.startVertex(v1);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        for (TezTaskID taskId : v1.getTasks().keySet()) {
            v1.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v1.getState());
        initializer.waitForVertexStateUpdate();
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.CONFIGURED, (Object)((VertexStateUpdate)initializer.stateUpdates.get(0)).getVertexState());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.RUNNING, (Object)((VertexStateUpdate)initializer.stateUpdates.get(1)).getVertexState());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.SUCCEEDED, (Object)((VertexStateUpdate)initializer.stateUpdates.get(2)).getVertexState());
    }

    @Test(timeout=10000L)
    public void testInputInitializerEventMultipleAttempts() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer4();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer v3 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex3");
        this.initVertex(v1);
        this.startVertex(v1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v3.getState());
        ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
        InputInitializerEvent event = InputInitializerEvent.create((String)"vertex3", (String)"input1", (ByteBuffer)payload);
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)event, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        ByteBuffer expected = payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 1);
        event = InputInitializerEvent.create((String)"vertex3", (String)"input1", (ByteBuffer)payload);
        TezTaskAttemptID ta1_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)1);
        tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)event, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta1_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)v3.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = v3.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals((long)1L, (long)initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals((long)2L, (long)initializerWrapper.getPendingEvents().get((Object)v1.getName()).size());
        for (TezTaskID taskId : v1.getTasks().keySet()) {
            TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)1);
            v1.handle((VertexEvent)new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
            v1.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
            this.dispatcher.await();
            v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
        }
        this.dispatcher.await();
        while (v3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
        Assert.assertEquals((long)1L, (long)initializer.initializerEvents.size());
        Assert.assertEquals((Object)expected, (Object)((InputInitializerEvent)initializer.initializerEvents.get(0)).getUserPayload());
    }

    @Test(timeout=10000L)
    public void testInputInitializerEventsMultipleSources() throws Exception {
        TezTaskAttemptID taskAttemptId;
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        initializer.setNumExpectedEvents(4);
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer4();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer v3 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex3");
        this.initVertex(v1);
        this.startVertex(v1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v3.getState());
        LinkedList<ByteBuffer> expectedPayloads = new LinkedList<ByteBuffer>();
        ByteBuffer payload = ByteBuffer.allocate(12).putInt(0, 1).putInt(4, 0).putInt(8, 0);
        expectedPayloads.add(payload);
        InputInitializerEvent event = InputInitializerEvent.create((String)"vertex3", (String)"input1", (ByteBuffer)payload);
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)event, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)v3.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = v3.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals((long)1L, (long)initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals((long)1L, (long)initializerWrapper.getPendingEvents().get((Object)v1.getName()).size());
        for (TezTaskID taskId : v1.getTasks().keySet()) {
            taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)0);
            v1.handle((VertexEvent)new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
            v1.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
            this.dispatcher.await();
            v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
        }
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)initializer.initializerEvents.size());
        Assert.assertEquals((long)2L, (long)v2.getTotalTasks());
        for (Task task : v2.getTasks().values()) {
            TezTaskID taskId = task.getTaskId();
            TezTaskAttemptID attemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)0);
            int numEventsFromTask = taskId.getId() + 1;
            for (int i = 0; i < numEventsFromTask; ++i) {
                payload = ByteBuffer.allocate(12).putInt(0, 2).putInt(4, taskId.getId()).putInt(8, i);
                expectedPayloads.add(payload);
                InputInitializerEvent event2 = InputInitializerEvent.create((String)"vertex3", (String)"input1", (ByteBuffer)payload);
                TezEvent tezEvent2 = new TezEvent((org.apache.tez.runtime.api.Event)event2, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", attemptId));
                this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), Collections.singletonList(tezEvent2)));
                this.dispatcher.await();
            }
        }
        Assert.assertEquals((long)1L, (long)initializerWrapper.getPendingEvents().keySet().size());
        Assert.assertEquals((long)3L, (long)initializerWrapper.getPendingEvents().get((Object)v2.getName()).size());
        for (TezTaskID taskId : v2.getTasks().keySet()) {
            taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)0);
            v2.handle((VertexEvent)new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
            v2.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
            this.dispatcher.await();
            v2.stateChangeNotifier.taskSucceeded(v2.getName(), taskId, taskAttemptId.getId());
        }
        this.dispatcher.await();
        while (v3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
        Assert.assertEquals((long)4L, (long)initializer.initializerEvents.size());
        Assert.assertTrue((boolean)initializer.initComplete.get());
        Assert.assertEquals((long)2L, (long)initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals((long)0L, (long)initializerWrapper.getPendingEvents().get((Object)v1.getName()).size());
        for (InputInitializerEvent initializerEvent : initializer.initializerEvents) {
            expectedPayloads.remove(initializerEvent.getUserPayload());
        }
        Assert.assertEquals((long)0L, (long)expectedPayloads.size());
    }

    @Test(timeout=10000L)
    public void testInputInitializerEventNoDirectConnection() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer4();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer v3 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex3");
        this.initVertex(v1);
        this.startVertex(v1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v3.getState());
        InputInitializerEvent event = InputInitializerEvent.create((String)"vertex3", (String)"input1", null);
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)event, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)v3.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = v3.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals((long)1L, (long)initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals((long)1L, (long)initializerWrapper.getPendingEvents().get((Object)v1.getName()).size());
        for (TezTaskID taskId : v1.getTasks().keySet()) {
            TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)0);
            v1.handle((VertexEvent)new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
            v1.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
            this.dispatcher.await();
            v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
        }
        this.dispatcher.await();
        while (v3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
        Assert.assertEquals((long)1L, (long)initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals((long)0L, (long)initializerWrapper.getPendingEvents().get((Object)v1.getName()).size());
        Assert.assertTrue((boolean)initializer.eventReceived.get());
        Assert.assertEquals((long)3L, (long)initializer.stateUpdates.size());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.CONFIGURED, (Object)((VertexStateUpdate)initializer.stateUpdates.get(0)).getVertexState());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.RUNNING, (Object)((VertexStateUpdate)initializer.stateUpdates.get(1)).getVertexState());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.SUCCEEDED, (Object)((VertexStateUpdate)initializer.stateUpdates.get(2)).getVertexState());
    }

    @Test(timeout=10000L)
    public void testInputInitializerEventsAtNew() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer3();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        VertexImplWithRunningInputInitializer v3 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex3");
        this.initVertex(v1);
        this.startVertex(v1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.NEW, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.NEW, (Object)v3.getState());
        InputInitializerEvent event = InputInitializerEvent.create((String)"vertex3", (String)"input1", null);
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)event, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", ta0_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)v3.pendingInitializerEvents.size());
        for (TezTaskID taskId : v1.getTasks().keySet()) {
            TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)0);
            TaskImpl task = (TaskImpl)v1.getTask(taskId);
            task.handle(new TaskEvent(taskId, TaskEventType.T_ATTEMPT_LAUNCHED));
            task.handle((TaskEvent)new TaskEventTASucceeded(taskAttemptId));
            v1.handle((VertexEvent)new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
            v1.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
            this.dispatcher.await();
            v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
        }
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)v3.pendingInitializerEvents.size());
        Assert.assertEquals((Object)VertexState.NEW, (Object)v3.getState());
        this.initVertex(v2);
        this.startVertex(v2);
        this.dispatcher.await();
        while (v3.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
        Assert.assertEquals((long)0L, (long)v3.pendingInitializerEvents.size());
        Assert.assertTrue((boolean)initializer.eventReceived.get());
        Assert.assertEquals((long)3L, (long)initializer.stateUpdates.size());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.CONFIGURED, (Object)((VertexStateUpdate)initializer.stateUpdates.get(0)).getVertexState());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.RUNNING, (Object)((VertexStateUpdate)initializer.stateUpdates.get(1)).getVertexState());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.SUCCEEDED, (Object)((VertexStateUpdate)initializer.stateUpdates.get(2)).getVertexState());
    }

    @Test(timeout=10000L)
    public void testInputInitializerEvents() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        this.initVertex(v1);
        this.startVertex(v1);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v2.getState());
        this.dispatcher.await();
        while (!initializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        Assert.assertFalse((boolean)initializer.eventReceived.get());
        Assert.assertFalse((boolean)initializer.initComplete.get());
        InputInitializerEvent event = InputInitializerEvent.create((String)"vertex2", (String)"input1", null);
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)event, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)v2.pendingInitializerEvents.size());
        RootInputInitializerManager.InitializerWrapper initializerWrapper = v2.rootInputInitializerManager.getInitializerWrapper("input1");
        Assert.assertEquals((long)1L, (long)initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals((long)1L, (long)initializerWrapper.getPendingEvents().get((Object)v1.getName()).size());
        for (TezTaskID taskId : v1.getTasks().keySet()) {
            TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)0);
            v1.handle((VertexEvent)new VertexEventTaskAttemptCompleted(taskAttemptId, TaskAttemptStateInternal.SUCCEEDED));
            v1.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
            this.dispatcher.await();
            v1.stateChangeNotifier.taskSucceeded(v1.getName(), taskId, taskAttemptId.getId());
        }
        while (!initializer.eventReceived.get()) {
            Thread.sleep(10L);
        }
        while (!initializer.initComplete.get()) {
            Thread.sleep(10L);
        }
        while (v2.getState() != VertexState.RUNNING) {
            Thread.sleep(10L);
        }
        Assert.assertEquals((long)1L, (long)initializerWrapper.getFirstSuccessfulAttemptMap().size());
        Assert.assertEquals((long)0L, (long)initializerWrapper.getPendingEvents().get((Object)v1.getName()).size());
    }

    @Test(timeout=5000L)
    public void testTaskSchedulingWithCustomEdges() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createCustomDAGWithCustomEdges();
        this.setupPostDagCreation();
        VertexImpl m2 = this.vertices.get("M2");
        VertexImpl m7 = this.vertices.get("M7");
        VertexImpl r3 = this.vertices.get("R3");
        VertexImpl m5 = this.vertices.get("M5");
        VertexImpl m8 = this.vertices.get("M8");
        VertexImpl m9 = this.vertices.get("M9");
        this.initVertex(m2);
        this.initVertex(m7);
        this.initVertex(m8);
        this.initVertex(m9);
        Assert.assertTrue((boolean)m7.getState().equals((Object)VertexState.INITED));
        Assert.assertTrue((boolean)m9.getState().equals((Object)VertexState.INITED));
        Assert.assertTrue((boolean)m5.getState().equals((Object)VertexState.INITED));
        Assert.assertTrue((boolean)m8.getState().equals((Object)VertexState.INITED));
        Assert.assertTrue((boolean)(m5.getVertexManager().getPlugin() instanceof ImmediateStartVertexManager));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(m9.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(m2.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertTrue((boolean)r3.getState().equals((Object)VertexState.RUNNING));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(m7.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertTrue((boolean)m5.getState().equals((Object)VertexState.INITED));
        for (Task task : m5.getTasks().values()) {
            Assert.assertTrue((boolean)task.getState().equals((Object)TaskState.NEW));
        }
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(m8.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertTrue((boolean)m9.getState().equals((Object)VertexState.SUCCEEDED));
        Assert.assertTrue((boolean)m5.getState().equals((Object)VertexState.RUNNING));
        for (Task task : m5.getTasks().values()) {
            Assert.assertTrue((boolean)task.getState().equals((Object)TaskState.SCHEDULED));
        }
    }

    private DAGProtos.DAGPlan createCustomDAGWithCustomEdges() {
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("TestSamplerDAG").addVertex(DAGProtos.VertexPlan.newBuilder().setName("M2").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M2.class").build()).addOutEdgeId("M2_R3").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M8").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M8.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M8.class").build()).addOutEdgeId("M8_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M9").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M9.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(0).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M9.class").build()).addOutEdgeId("M9_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("R3").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(10).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("R3.class").build()).addInEdgeId("M2_R3").addOutEdgeId("R3_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M5").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M5.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(10).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M5.class").build()).addInEdgeId("R3_M5").addInEdgeId("M7_M5").addInEdgeId("M8_M5").addInEdgeId("M9_M5").addOutEdgeId("M5_R6").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("M7").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M7.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(10).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("M7.class").build()).addOutEdgeId("M7_M5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("R6").setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("R6.class")).setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("R6.class").build()).addInEdgeId("M5_R6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2_R3")).setInputVertexName("M2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M2_R3.class")).setOutputVertexName("R3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("M2_R3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("R3_M5")).setInputVertexName("R3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("R3_M5.class")).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("R3_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M7_M5")).setInputVertexName("M7").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M7_M5.class")).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("M7_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M5_R6")).setInputVertexName("M5").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M5_R6.class")).setOutputVertexName("R6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("M5_R6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M9_M5")).setInputVertexName("M9").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M9_M5.class")).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.BROADCAST).setId("M9_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M8_M5")).setInputVertexName("M8").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("M8_M5.class")).setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(EdgeManagerForTest.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])this.edgePayload))).build()).setOutputVertexName("M5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("M8_M5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    @Test(timeout=5000L)
    public void testVertexWithMultipleInitializers1() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithMultipleInitializers("TestInputInitializer");
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        List<TaskLocationHint> v1Hints = this.createTaskLocationHints(5);
        initializerManager1.completeInputInitialization(0, 5, v1Hints);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((long)1L, (long)v1.numInitializerCompletionsHandled);
        initializerManager1.completeInputInitialization(1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        Assert.assertEquals((long)2L, (long)v1.numInitializerCompletionsHandled);
    }

    @Test(timeout=5000L)
    public void testVertexWithMultipleInitializers2() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithMultipleInitializers("TestInputInitializer");
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        List<TaskLocationHint> v1Hints = this.createTaskLocationHints(5);
        initializerManager1.completeInputInitialization(1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((long)1L, (long)v1.numInitializerCompletionsHandled);
        initializerManager1.completeInputInitialization(0, 5, v1Hints);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        Assert.assertEquals((long)2L, (long)v1.numInitializerCompletionsHandled);
    }

    @Test(timeout=500000L)
    public void testVertexWithInitializerSuccess() throws Exception {
        int i;
        int i2;
        int i3;
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithInputInitializer("TestInputInitializer");
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        List<TaskLocationHint> v1Hints = this.createTaskLocationHints(5);
        initializerManager1.completeInputInitialization(0, 5, v1Hints);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        Assert.assertEquals((long)5L, (long)v1.getTotalTasks());
        Assert.assertEquals((Object)RootInputVertexManager.class.getName(), (Object)v1.getVertexManager().getPlugin().getClass().getName());
        for (i3 = 0; i3 < v1Hints.size(); ++i3) {
            Assert.assertEquals((Object)v1Hints.get(i3), (Object)v1.getTaskLocationHints()[i3]);
        }
        Assert.assertEquals((Object)true, (Object)initializerManager1.hasShutDown);
        for (i3 = 0; i3 < 5; ++i3) {
            List inputSpecs = v1.getInputSpecList(i3);
            Assert.assertEquals((long)1L, (long)inputSpecs.size());
            Assert.assertEquals((long)1L, (long)((InputSpec)inputSpecs.get(0)).getPhysicalEdgeCount());
        }
        LinkedList<VertexManagerPluginContext.ScheduleTaskRequest> taskList = new LinkedList<VertexManagerPluginContext.ScheduleTaskRequest>();
        for (i2 = 0; i2 < v1.getTotalTasks(); ++i2) {
            taskList.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i2, null));
        }
        v1.scheduleTasks(taskList);
        this.dispatcher.await();
        for (i2 = 0; i2 < v1.getTotalTasks(); ++i2) {
            Assert.assertEquals((long)1L, (long)v1.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance((TezTaskID)v1.getTask(i2).getTaskId(), (int)0), 0, 0, 100).getEvents().size());
        }
        VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex2");
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v2.getState());
        LinkedList events = Lists.newLinkedList();
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        events.add(new TezEvent((org.apache.tez.runtime.api.Event)VertexManagerEvent.create((String)"vertex2", (ByteBuffer)ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2", ta0_t0_v1)));
        events.add(new TezEvent((org.apache.tez.runtime.api.Event)InputDataInformationEvent.createWithSerializedPayload((int)0, (ByteBuffer)ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "NULL_VERTEX", null)));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)events));
        this.dispatcher.await();
        RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
        List<TaskLocationHint> v2Hints = this.createTaskLocationHints(10);
        initializerManager2.completeInputInitialization(0, 10, v2Hints);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v2.getState());
        Assert.assertEquals((long)10L, (long)v2.getTotalTasks());
        Assert.assertEquals((Object)RootInputVertexManager.class.getName(), (Object)v2.getVertexManager().getPlugin().getClass().getName());
        for (i = 0; i < v2Hints.size(); ++i) {
            Assert.assertEquals((Object)v2Hints.get(i), (Object)v2.getTaskLocationHints()[i]);
        }
        Assert.assertEquals((Object)true, (Object)initializerManager2.hasShutDown);
        taskList = new LinkedList();
        for (i = 0; i < v2.getTotalTasks(); ++i) {
            taskList.add(VertexManagerPluginContext.ScheduleTaskRequest.create((int)i, null));
        }
        v2.scheduleTasks(taskList);
        this.dispatcher.await();
        for (i = 0; i < v2.getTotalTasks(); ++i) {
            Assert.assertEquals((long)(i == 0 ? 2 : 1), (long)v2.getTaskAttemptTezEvents(TezTaskAttemptID.getInstance((TezTaskID)v2.getTask(i).getTaskId(), (int)0), 0, 0, 100).getEvents().size());
        }
        for (i = 0; i < 10; ++i) {
            List inputSpecs = v1.getInputSpecList(i);
            Assert.assertEquals((long)1L, (long)inputSpecs.size());
            Assert.assertEquals((long)1L, (long)((InputSpec)inputSpecs.get(0)).getPhysicalEdgeCount());
        }
    }

    @Test(timeout=5000L)
    public void testVertexWithInitializerSuccessLegacyRouting() throws Exception {
        int i;
        int i2;
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithInputInitializer("TestInputInitializer");
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        List<TaskLocationHint> v1Hints = this.createTaskLocationHints(5);
        initializerManager1.completeInputInitialization(0, 5, v1Hints);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        Assert.assertEquals((long)5L, (long)v1.getTotalTasks());
        Assert.assertEquals((Object)RootInputVertexManager.class.getName(), (Object)v1.getVertexManager().getPlugin().getClass().getName());
        for (i2 = 0; i2 < v1Hints.size(); ++i2) {
            Assert.assertEquals((Object)v1Hints.get(i2), (Object)v1.getTaskLocationHints()[i2]);
        }
        Assert.assertEquals((Object)true, (Object)initializerManager1.hasShutDown);
        for (i2 = 0; i2 < 5; ++i2) {
            List inputSpecs = v1.getInputSpecList(i2);
            Assert.assertEquals((long)1L, (long)inputSpecs.size());
            Assert.assertEquals((long)1L, (long)((InputSpec)inputSpecs.get(0)).getPhysicalEdgeCount());
        }
        Assert.assertEquals((long)5L, (long)v1.pendingTaskEvents.size());
        VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex2");
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v2.getState());
        LinkedList events = Lists.newLinkedList();
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        events.add(new TezEvent((org.apache.tez.runtime.api.Event)VertexManagerEvent.create((String)"vertex2", (ByteBuffer)ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.PROCESSOR, "vertex1", "vertex2", ta0_t0_v1)));
        events.add(new TezEvent((org.apache.tez.runtime.api.Event)InputDataInformationEvent.createWithSerializedPayload((int)0, (ByteBuffer)ByteBuffer.wrap(new byte[0])), new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "NULL_VERTEX", null)));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)events));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)v2.pendingTaskEvents.size());
        RootInputInitializerManagerControlled initializerManager2 = v2.getRootInputInitializerManager();
        List<TaskLocationHint> v2Hints = this.createTaskLocationHints(10);
        initializerManager2.completeInputInitialization(0, 10, v2Hints);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v2.getState());
        Assert.assertEquals((long)10L, (long)v2.getTotalTasks());
        Assert.assertEquals((Object)RootInputVertexManager.class.getName(), (Object)v2.getVertexManager().getPlugin().getClass().getName());
        for (i = 0; i < v2Hints.size(); ++i) {
            Assert.assertEquals((Object)v2Hints.get(i), (Object)v2.getTaskLocationHints()[i]);
        }
        Assert.assertEquals((Object)true, (Object)initializerManager2.hasShutDown);
        Assert.assertEquals((long)11L, (long)v2.pendingTaskEvents.size());
        for (i = 0; i < 10; ++i) {
            List inputSpecs = v1.getInputSpecList(i);
            Assert.assertEquals((long)1L, (long)inputSpecs.size());
            Assert.assertEquals((long)1L, (long)((InputSpec)inputSpecs.get(0)).getPhysicalEdgeCount());
        }
    }

    @Test(timeout=5000L)
    public void testVertexWithInputDistributor() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithInputDistributor("TestInputInitializer");
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        VertexImplWithControlledInitializerManager v2 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v2.getState());
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        byte[] payload = new byte[]{};
        initializerManager1.completeInputDistribution(payload);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((Object)true, (Object)initializerManager1.hasShutDown);
        Assert.assertEquals((long)2L, (long)v1.getTotalTasks());
        Assert.assertArrayEquals((byte[])payload, (byte[])((InputSpec)v1.getInputSpecList(0).get(0)).getInputDescriptor().getUserPayload().deepCopyAsArray());
        EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        Edge e = (Edge)v2.sourceVertices.get((Object)v1);
        Assert.assertNull((Object)e.getEdgeManager());
        e.setCustomEdgeManager(mockEdgeManagerDescriptor);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)v2.getState());
    }

    @Test(timeout=5000L)
    public void testVertexRootInputSpecUpdateAll() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithInputInitializer("TestInputInitializer");
        this.setupPostDagCreation();
        int expectedNumTasks = 5;
        VertexImplWithControlledInitializerManager v3 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v3.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v3.getState());
        RootInputInitializerManagerControlled initializerManager1 = v3.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v3.getState());
        Assert.assertEquals((long)expectedNumTasks, (long)v3.getTotalTasks());
        Assert.assertEquals((Object)RootInputSpecUpdaterVertexManager.class.getName(), (Object)v3.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals((Object)true, (Object)initializerManager1.hasShutDown);
        for (int i = 0; i < expectedNumTasks; ++i) {
            List inputSpecs = v3.getInputSpecList(i);
            Assert.assertEquals((long)1L, (long)inputSpecs.size());
            Assert.assertEquals((long)4L, (long)((InputSpec)inputSpecs.get(0)).getPhysicalEdgeCount());
        }
    }

    @Test(timeout=5000L)
    public void testVertexRootInputSpecUpdatePerTask() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithInputInitializer("TestInputInitializer");
        this.setupPostDagCreation();
        int expectedNumTasks = 5;
        VertexImplWithControlledInitializerManager v4 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex4");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v4.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v4.getState());
        RootInputInitializerManagerControlled initializerManager1 = v4.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v4.getState());
        Assert.assertEquals((long)expectedNumTasks, (long)v4.getTotalTasks());
        Assert.assertEquals((Object)RootInputSpecUpdaterVertexManager.class.getName(), (Object)v4.getVertexManager().getPlugin().getClass().getName());
        Assert.assertEquals((Object)true, (Object)initializerManager1.hasShutDown);
        for (int i = 0; i < expectedNumTasks; ++i) {
            List inputSpecs = v4.getInputSpecList(i);
            Assert.assertEquals((long)1L, (long)inputSpecs.size());
            Assert.assertEquals((long)(i + 1), (long)((InputSpec)inputSpecs.get(0)).getPhysicalEdgeCount());
        }
    }

    private List<TaskLocationHint> createTaskLocationHints(int numTasks) {
        ArrayList locationHints = Lists.newArrayListWithCapacity((int)numTasks);
        for (int i = 0; i < numTasks; ++i) {
            TaskLocationHint taskLocationHint = TaskLocationHint.createTaskLocationHint((Set)Sets.newSet((Object[])new String[]{"host" + i}), null);
            locationHints.add(taskLocationHint);
        }
        return locationHints;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(timeout=5000L)
    public void testVertexWithNoTasks() {
        block3: {
            TezVertexID vId = null;
            try {
                TezDAGID invalidDagId = TezDAGID.getInstance((ApplicationId)this.dagId.getApplicationId(), (int)1000);
                vId = TezVertexID.getInstance((TezDAGID)invalidDagId, (int)1);
                DAGProtos.VertexPlan vPlan = this.invalidDagPlan.getVertex(0);
                VertexImpl v = new VertexImpl(vId, vPlan, vPlan.getName(), this.conf, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.clock, this.thh, true, this.appContext, this.vertexLocationHint, null, taskSpecificLaunchCmdOption, (StateChangeNotifier)this.updateTracker, new Configuration(false));
                v.setInputVertices(new HashMap());
                this.vertexIdMap.put(vId, v);
                this.vertices.put(v.getName(), v);
                v.handle(new VertexEvent(vId, VertexEventType.V_INIT));
                this.dispatcher.await();
                Assert.assertEquals((Object)VertexState.INITED, (Object)v.getState());
                Assert.assertTrue((0.0f == v.getCompletedTaskProgress() ? 1 : 0) != 0);
                v.handle(new VertexEvent(vId, VertexEventType.V_START));
                this.dispatcher.await();
                Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
                Assert.assertTrue((1.0f == v.getCompletedTaskProgress() ? 1 : 0) != 0);
                if (vId == null) break block3;
                this.vertexIdMap.remove(vId);
            }
            catch (Throwable throwable) {
                if (vId != null) {
                    this.vertexIdMap.remove(vId);
                }
                throw throwable;
            }
        }
    }

    @Test(timeout=5000L)
    public void testVertexGroupInput() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createVertexGroupDAGPlan();
        this.setupPostDagCreation();
        VertexImpl vA = this.vertices.get("A");
        VertexImpl vB = this.vertices.get("B");
        VertexImpl vC = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vB.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertNull((Object)vA.getGroupInputSpecList(0));
        Assert.assertNull((Object)vB.getGroupInputSpecList(0));
        List groupInSpec = vC.getGroupInputSpecList(0);
        Assert.assertEquals((long)1L, (long)groupInSpec.size());
        Assert.assertEquals((Object)"Group", (Object)((GroupInputSpec)groupInSpec.get(0)).getGroupName());
        Assert.assertTrue((boolean)((GroupInputSpec)groupInSpec.get(0)).getGroupVertices().contains("A"));
        Assert.assertTrue((boolean)((GroupInputSpec)groupInSpec.get(0)).getGroupVertices().contains("B"));
        ((GroupInputSpec)groupInSpec.get(0)).getMergedInputDescriptor().getClassName().equals("Group.class");
    }

    @Test(timeout=5000L)
    public void testStartWithUninitializedCustomEdge() throws Exception {
        this.setupPreDagCreation();
        this.dagPlan = this.createSamplerDAGPlan(true);
        this.setupPostDagCreation();
        VertexImpl vA = this.vertices.get("A");
        VertexImpl vB = this.vertices.get("B");
        VertexImpl vC = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vC.getState());
        EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        Edge e = (Edge)vC.sourceVertices.get(vA);
        Assert.assertNull((Object)e.getEdgeManager());
        e.setCustomEdgeManager(mockEdgeManagerDescriptor);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vC.getState());
        EdgeProperty edgeProp = EdgeProperty.create((EdgeManagerPluginDescriptor)mockEdgeManagerDescriptor, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"));
        HashMap edges = Maps.newHashMap();
        edges.put("B", edgeProp);
        vC.reconfigureVertex(2, this.vertexLocationHint, (Map)edges);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vC.getState());
        Assert.assertNotNull((Object)vA.getTask(0));
        Assert.assertNotNull((Object)vB.getTask(0));
        Assert.assertNotNull((Object)vC.getTask(0));
    }

    @Test(timeout=5000L)
    public void testVertexConfiguredDoneByVMBeforeEdgeDefined() throws Exception {
        this.setupPreDagCreation();
        this.dagPlan = this.createSamplerDAGPlan(true);
        this.setupPostDagCreation();
        VertexImpl vA = this.vertices.get("A");
        VertexImpl vB = this.vertices.get("B");
        VertexImpl vC = this.vertices.get("C");
        TestUpdateListener listener = new TestUpdateListener();
        this.updateTracker.registerForVertexUpdates(vB.getName(), EnumSet.of(org.apache.tez.dag.api.event.VertexState.CONFIGURED), listener);
        vB.vertexManager = new VertexManager(VertexManagerPluginDescriptor.create((String)VertexManagerPluginForTest.class.getName()), UserGroupInformation.getCurrentUser(), (Vertex)vB, this.appContext, (StateChangeNotifier)Mockito.mock(StateChangeNotifier.class));
        vB.vertexReconfigurationPlanned();
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vC.getState());
        EdgeManagerPluginDescriptor mockEdgeManagerDescriptor = EdgeManagerPluginDescriptor.create((String)EdgeManagerForTest.class.getName());
        Edge e = (Edge)vC.sourceVertices.get(vA);
        Assert.assertNull((Object)e.getEdgeManager());
        e.setCustomEdgeManager(mockEdgeManagerDescriptor);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)vC.getState());
        vB.doneReconfiguringVertex();
        Assert.assertEquals((long)0L, (long)listener.events.size());
        EdgeProperty edgeProp = EdgeProperty.create((EdgeManagerPluginDescriptor)mockEdgeManagerDescriptor, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"Out"), (InputDescriptor)InputDescriptor.create((String)"In"));
        HashMap edges = Maps.newHashMap();
        edges.put("B", edgeProp);
        vC.reconfigureVertex(2, this.vertexLocationHint, (Map)edges);
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)listener.events.size());
        Assert.assertEquals((Object)vB.getName(), (Object)listener.events.get(0).getVertexName());
        Assert.assertEquals((Object)org.apache.tez.dag.api.event.VertexState.CONFIGURED, (Object)listener.events.get(0).getVertexState());
        this.updateTracker.unregisterForVertexUpdates(vB.getName(), listener);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vC.getState());
        Assert.assertNotNull((Object)vA.getTask(0));
        Assert.assertNotNull((Object)vB.getTask(0));
        Assert.assertNotNull((Object)vC.getTask(0));
    }

    @Test(timeout=5000L)
    public void testInitStartRace() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createSamplerDAGPlan(false);
        this.setupPostDagCreation();
        VertexImpl vA = this.vertices.get("A");
        VertexImpl vB = this.vertices.get("B");
        VertexImpl vC = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vC.getState());
    }

    @Test(timeout=5000L)
    public void testInitStartRace2() throws TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createSamplerDAGPlan2();
        this.setupPostDagCreation();
        VertexImpl vA = this.vertices.get("A");
        VertexImpl vB = this.vertices.get("B");
        VertexImpl vC = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_START));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vB.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vB.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vC.getState());
    }

    @Test(timeout=5000L)
    public void testTez2684() throws IOException, TezException {
        this.setupPreDagCreation();
        this.dagPlan = this.createSamplerDAGPlan2();
        this.setupPostDagCreation();
        VertexImpl vA = this.vertices.get("A");
        VertexImpl vB = this.vertices.get("B");
        VertexImpl vC = this.vertices.get("C");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vA.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vA.getState());
        Assert.assertEquals((Object)VertexState.NEW, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.NEW, (Object)vC.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vB.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)vB.getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)vC.getState());
        long[] sizes = new long[]{100000000L};
        VertexManagerEvent vmEvent = this.getVertexManagerEvent(sizes, 1060000000L, "B");
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)vC.getVertexId(), (int)1), (int)1);
        EventMetaData sourceInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "B", "C", taId);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)vmEvent, sourceInfo);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(vC.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)vC.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(vB.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vC.getState());
    }

    VertexManagerEvent getVertexManagerEvent(long[] sizes, long totalSize, String vertexName) throws IOException {
        ByteBuffer payload = null;
        if (sizes != null) {
            RoaringBitmap partitionStats = ShuffleUtils.getPartitionStatsForPhysicalOutput((long[])sizes);
            DataOutputBuffer dout = new DataOutputBuffer();
            partitionStats.serialize((DataOutput)dout);
            ByteString partitionStatsBytes = TezCommonUtils.compressByteArrayToByteString((byte[])dout.getData());
            payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(totalSize).setPartitionStats(partitionStatsBytes).build().toByteString().asReadOnlyByteBuffer();
        } else {
            payload = ShuffleUserPayloads.VertexManagerEventPayloadProto.newBuilder().setOutputSize(totalSize).build().toByteString().asReadOnlyByteBuffer();
        }
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)vertexName, (ByteBuffer)payload);
        return vmEvent;
    }

    @Test(timeout=5000L)
    public void testVMEventBeforeVertexInitialized() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithCountingVM();
        this.setupPostDagCreation();
        VertexImpl v1 = this.vertices.get("vertex1");
        VertexImpl v2 = this.vertices.get("vertex2");
        VertexImpl v3 = this.vertices.get("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.NEW, (Object)v3.getState());
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)"vertex3", (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        TezEvent tezVmEvent = new TezEvent((org.apache.tez.runtime.api.Event)vmEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", null, TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)1), (int)1)));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)v3.pendingVmEvents.size());
        Assert.assertEquals((long)0L, (long)InvocationCountingVertexManager.numVmEventsReceived.get());
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v2.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v3.getState());
        Assert.assertEquals((long)0L, (long)v3.pendingVmEvents.size());
        Assert.assertEquals((long)1L, (long)InvocationCountingVertexManager.numVmEventsReceived.get());
        vmEvent = VertexManagerEvent.create((String)"vertex3", (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        tezVmEvent = new TezEvent((org.apache.tez.runtime.api.Event)vmEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", null, TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)1), (int)2)));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezVmEvent)));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)v3.pendingVmEvents.size());
        Assert.assertEquals((long)2L, (long)InvocationCountingVertexManager.numVmEventsReceived.get());
    }

    @Test(timeout=5000L)
    public void testExceptionFromVM_Initialize() throws TezException {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.Initialize);
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        String diagnostics = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(VertexManagerWithException.VMExceptionLocation.Initialize.name()));
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)v1.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromVM_OnRootVertexInitialized() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnRootVertexInitialized);
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals(VertexManagerWithException.class, v1.vertexManager.getPlugin().getClass());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertTrue((boolean)initializerManager1.hasShutDown);
        String diagnostics = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(VertexManagerWithException.VMExceptionLocation.OnRootVertexInitialized.name()));
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)v1.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromVM_OnVertexStarted() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnVertexStarted);
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        Assert.assertEquals(VertexManagerWithException.class, v1.vertexManager.getPlugin().getClass());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        String diagnostics = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(VertexManagerWithException.VMExceptionLocation.OnVertexStarted.name()));
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)v1.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromVM_OnSourceTaskCompleted() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnSourceTaskCompleted);
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        VertexImpl v2 = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_START));
        this.dispatcher.await();
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0), (int)0);
        v1.getEventHandler().handle((Event)new VertexEventTaskAttemptCompleted(taId, TaskAttemptStateInternal.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals(VertexManagerWithException.class, v1.vertexManager.getPlugin().getClass());
        Assert.assertEquals(VertexManagerWithException.class, v2.vertexManager.getPlugin().getClass());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        String diagnostics = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(VertexManagerWithException.VMExceptionLocation.OnSourceTaskCompleted.name()));
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)v2.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromVM_OnVertexManagerEventReceived() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithVMException("TestInputInitializer", VertexManagerWithException.VMExceptionLocation.OnVertexManagerEventReceived);
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        VertexManagerEvent vmEvent = VertexManagerEvent.create((String)v1.getName(), (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        TezTaskAttemptID taId1 = TezTaskAttemptID.getInstance((TezTaskID)TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0), (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)vmEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, v1.getName(), null, taId1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        String diagnostics = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(VertexManagerWithException.VMExceptionLocation.OnVertexManagerEventReceived.name()));
    }

    @Test(timeout=5000L)
    public void testExceptionFromVM_OnVertexManagerVertexStateUpdated() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithVMException("TestVMStateUpdate", VertexManagerWithException.VMExceptionLocation.OnVertexManagerVertexStateUpdated);
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        VertexImpl v2 = this.vertices.get("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v1.getVertexId(), VertexEventType.V_INIT));
        this.dispatcher.await();
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization();
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v2.getState());
        this.startVertex(v1, false);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        String diagnostics = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(VertexManagerWithException.VMExceptionLocation.OnVertexManagerVertexStateUpdated.name()));
        Assert.assertEquals((Object)VertexTerminationCause.AM_USERCODE_FAILURE, (Object)v2.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromII_Initialize() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.Initialize);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithIIException();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        this.initVertex(v1);
        while (v1.getState() != VertexState.FAILED) {
            Thread.sleep(10L);
        }
        String diagnostics = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(IIExceptionLocation.Initialize.name()));
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, (Object)v1.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromII_InitFailedAfterInitialized() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithIIException();
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.initVertex(v1);
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization(0);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.INITED, (Object)v1.getState());
        String errorMsg = "ErrorWhenInitFailureAtInited";
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRootInputFailed(v1.getVertexId(), "input1", new AMUserCodeException(AMUserCodeException.Source.InputInitializer, (Throwable)new Exception(errorMsg))));
        this.dispatcher.await();
        String diagnostics = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(errorMsg));
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, (Object)v1.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromII_InitFailedAfterRunning() throws Exception {
        this.useCustomInitializer = true;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithIIException();
        this.setupPostDagCreation();
        VertexImplWithControlledInitializerManager v1 = (VertexImplWithControlledInitializerManager)this.vertices.get("vertex1");
        this.initVertex(v1);
        RootInputInitializerManagerControlled initializerManager1 = v1.getRootInputInitializerManager();
        initializerManager1.completeInputInitialization(0);
        this.dispatcher.await();
        this.startVertex(v1);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        String errorMsg = "ErrorWhenInitFailureAtRunning";
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRootInputFailed(v1.getVertexId(), "input1", new AMUserCodeException(AMUserCodeException.Source.InputInitializer, (Throwable)new Exception(errorMsg))));
        this.dispatcher.await();
        String diagnostics = StringUtils.join((Collection)v1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(errorMsg));
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v1.getState());
        Assert.assertEquals((Object)VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, (Object)v1.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromII_HandleInputInitializerEvent() throws Exception {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.HandleInputInitializerEvent);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        this.initVertex(v1);
        this.startVertex(v1);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v2.getState());
        this.dispatcher.await();
        while (!initializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        Assert.assertFalse((boolean)initializer.eventReceived.get());
        Assert.assertFalse((boolean)initializer.initComplete.get());
        InputInitializerEvent event = InputInitializerEvent.create((String)"vertex2", (String)"input1", null);
        TezTaskID t0_v1 = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        TezTaskAttemptID ta0_t0_v1 = TezTaskAttemptID.getInstance((TezTaskID)t0_v1, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)event, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex2", ta0_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new TaskEvent(t0_v1, TaskEventType.T_ATTEMPT_LAUNCHED));
        this.dispatcher.getEventHandler().handle((Event)new TaskEventTASucceeded(ta0_t0_v1));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v1.getVertexId(), Collections.singletonList(tezEvent)));
        this.dispatcher.await();
        String diagnostics = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(IIExceptionLocation.HandleInputInitializerEvent.name()));
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, (Object)v2.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromII_OnVertexStateUpdated() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        this.initVertex(v1);
        while (!initializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        this.startVertex(v1);
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        String diagnostics = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name()));
        Assert.assertEquals((Object)VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, (Object)v2.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testExceptionFromII_InitSucceededAfterInitFailure() throws InterruptedException, TezException {
        this.useCustomInitializer = true;
        this.customInitializer = new EventHandlingRootInputInitializer(null, IIExceptionLocation.OnVertexStateUpdated);
        EventHandlingRootInputInitializer initializer = (EventHandlingRootInputInitializer)this.customInitializer;
        this.setupPreDagCreation();
        this.dagPlan = this.createDAGPlanWithRunningInitializer();
        this.setupPostDagCreation();
        VertexImplWithRunningInputInitializer v1 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex1");
        VertexImplWithRunningInputInitializer v2 = (VertexImplWithRunningInputInitializer)this.vertices.get("vertex2");
        this.initVertex(v1);
        while (!initializer.initStarted.get()) {
            Thread.sleep(10L);
        }
        this.startVertex(v1);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRootInputInitialized(v2.getVertexId(), "input1", null));
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        String diagnostics = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diagnostics.contains(IIExceptionLocation.OnVertexStateUpdated.name()));
        Assert.assertEquals((Object)VertexTerminationCause.ROOT_INPUT_INIT_FAILURE, (Object)v2.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testCompletedStatsCache() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        this.dispatcher.getEventHandler().handle((Event)new TaskEventTALaunched(TezTaskAttemptID.getInstance((TezTaskID)t1, (int)0)));
        this.dispatcher.getEventHandler().handle((Event)new TaskEventTASucceeded(TezTaskAttemptID.getInstance((TezTaskID)t1, (int)0)));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        VertexStatistics stats = v.getStatistics();
        Assert.assertTrue((boolean)v.completedTasksStatsCache.taskSet.get(0));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskReschedule(t1));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertTrue((v.completedTasksStatsCache.taskSet.cardinality() == 0 ? 1 : 0) != 0);
    }

    @Test(timeout=5000L)
    public void testRouteEvent_RecoveredEvent() throws IOException {
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.appContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)true).when((Object)this.appContext)).isRecoveryEnabled();
        this.initAllVertices(VertexState.INITED);
        VertexImpl v1 = this.vertices.get("vertex1");
        VertexImpl v2 = this.vertices.get("vertex2");
        VertexImpl v3 = this.vertices.get("vertex3");
        this.startVertex(v1);
        this.startVertex(v2);
        TezTaskID taskId = TezTaskID.getInstance((TezVertexID)v1.getVertexId(), (int)0);
        v1.handle((VertexEvent)new VertexEventTaskCompleted(taskId, TaskState.SUCCEEDED));
        DataMovementEvent dmEvent = DataMovementEvent.create((int)0, (ByteBuffer)ByteBuffer.wrap(new byte[0]));
        TezTaskAttemptID taId = TezTaskAttemptID.getInstance((TezTaskID)taskId, (int)0);
        TezEvent tezEvent1 = new TezEvent((org.apache.tez.runtime.api.Event)dmEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", taId));
        v1.handle((VertexEvent)new VertexEventRouteEvent(v1.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent1})));
        this.dispatcher.await();
        Assert.assertTrue((v3.pendingTaskEvents.size() != 0 ? 1 : 0) != 0);
        v3.scheduleTasks((List)Lists.newArrayList((Object[])new VertexManagerPluginContext.ScheduleTaskRequest[]{VertexManagerPluginContext.ScheduleTaskRequest.create((int)0, null)}));
        this.dispatcher.await();
        Assert.assertTrue((v3.pendingTaskEvents.size() == 0 ? 1 : 0) != 0);
    }

    private void verifyHistoryEvents(List<DAGHistoryEvent> events, HistoryEventType eventType, int expectedTimes) {
        int actualTimes = 0;
        LOG.info("");
        for (DAGHistoryEvent event : events) {
            LOG.info(event.getHistoryEvent().getEventType() + "");
            if (event.getHistoryEvent().getEventType() != eventType) continue;
            ++actualTimes;
        }
        Assert.assertEquals((long)actualTimes, (long)expectedTimes);
    }

    @Test(timeout=5000L)
    public void testCounterLimits() {
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID t1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID t2 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        for (int i = 0; i < 2; ++i) {
            TezCounters ctrs = new TezCounters();
            for (int j = 0; j < 75; ++j) {
                ctrs.findCounter("g", "c" + i + "_" + j).increment(1L);
            }
            Task t = v.getTask(i);
            ((TaskImpl)t).setCounters(ctrs);
        }
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t1, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)v.getCompletedTasks());
        Assert.assertTrue((0.5f == v.getCompletedTaskProgress() ? 1 : 0) != 0);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(t2, TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v.getState());
        Assert.assertEquals((long)2L, (long)v.getCompletedTasks());
        System.out.println(v.getDiagnostics());
        Assert.assertTrue((String)"Diagnostics should contain counter limits error message", (boolean)StringUtils.join((Collection)v.getDiagnostics(), (String)",").contains("Counters limit exceeded"));
    }

    @Test(timeout=5000L)
    public void testFirstTaskStartTime() {
        VertexImpl v = this.vertices.get("vertex1");
        Assert.assertEquals((long)v.getFirstTaskStartTime(), (long)-1L);
        v.reportTaskStartTime(100L);
        Assert.assertEquals((long)v.getFirstTaskStartTime(), (long)100L);
        v.reportTaskStartTime(50L);
        Assert.assertEquals((long)v.getFirstTaskStartTime(), (long)50L);
        v.reportTaskStartTime(200L);
        Assert.assertEquals((long)v.getFirstTaskStartTime(), (long)50L);
    }

    @Test(timeout=5000L)
    public void testLastTaskFinishTime() {
        NodeId nid = NodeId.newInstance((String)"127.0.0.1", (int)0);
        ContainerId contId = ContainerId.newContainerId((ApplicationAttemptId)this.appAttemptId, (long)3L);
        Container container = (Container)Mockito.mock(Container.class);
        Mockito.when((Object)container.getId()).thenReturn((Object)contId);
        Mockito.when((Object)container.getNodeId()).thenReturn((Object)nid);
        Mockito.when((Object)container.getNodeHttpAddress()).thenReturn((Object)"localhost:0");
        AMContainerMap containers = new AMContainerMap((ContainerHeartbeatHandler)Mockito.mock(ContainerHeartbeatHandler.class), (TaskCommunicatorManagerInterface)Mockito.mock(TaskCommunicatorManagerInterface.class), (ContainerSignatureMatcher)new ContainerContextMatcher(), this.appContext);
        containers.addContainerIfNew(container, 0, 0, 0);
        ((AppContext)Mockito.doReturn((Object)containers).when((Object)this.appContext)).getAllContainers();
        this.dispatcher.register(DAGEventType.class, (EventHandler)this.dagEventDispatcher);
        this.initAllVertices(VertexState.INITED);
        VertexImpl v = this.vertices.get("vertex2");
        this.startVertex(v);
        TezTaskID tid0 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0);
        TezTaskID tid1 = TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)1);
        TaskImpl task0 = (TaskImpl)v.getTask(tid0);
        TaskImpl task1 = (TaskImpl)v.getTask(tid1);
        TezTaskAttemptID taskAttemptId0 = TezTaskAttemptID.getInstance((TezTaskID)task0.getTaskId(), (int)0);
        TezTaskAttemptID taskAttemptId1 = TezTaskAttemptID.getInstance((TezTaskID)task1.getTaskId(), (int)0);
        TaskAttemptImpl taskAttempt0 = (TaskAttemptImpl)task0.getAttempt(taskAttemptId0);
        TaskAttemptImpl taskAttempt1 = (TaskAttemptImpl)task1.getAttempt(taskAttemptId1);
        Assert.assertEquals((long)v.getLastTaskFinishTime(), (long)-1L);
        taskAttempt0.handle((TaskAttemptEvent)new TaskAttemptEventSchedule(taskAttemptId0, 0, 0));
        taskAttempt0.handle((TaskAttemptEvent)new TaskAttemptEventStartedRemotely(taskAttemptId0, contId, null));
        taskAttempt0.handle(new TaskAttemptEvent(taskAttemptId0, TaskAttemptEventType.TA_DONE));
        Assert.assertEquals((long)v.getLastTaskFinishTime(), (long)-1L);
        taskAttempt1.handle((TaskAttemptEvent)new TaskAttemptEventSchedule(taskAttemptId1, 0, 0));
        taskAttempt1.handle((TaskAttemptEvent)new TaskAttemptEventStartedRemotely(taskAttemptId1, contId, null));
        taskAttempt1.handle(new TaskAttemptEvent(taskAttemptId1, TaskAttemptEventType.TA_DONE));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertTrue((v.getLastTaskFinishTime() > 0L ? 1 : 0) != 0);
    }

    static {
        Limits.reset();
        Configuration conf = new Configuration(false);
        conf.setInt("tez.counters.max", 100);
        conf.setInt("tez.counters.max.groups", 100);
        Limits.setConfiguration((Configuration)conf);
    }

    private static interface ContextSettableInputInitialzier {
        public void setContext(InputInitializerContext var1);
    }

    @InterfaceAudience.Private
    public static class EventHandlingRootInputInitializer
    extends InputInitializer
    implements ContextSettableInputInitialzier {
        final AtomicBoolean initStarted = new AtomicBoolean(false);
        final AtomicBoolean eventReceived = new AtomicBoolean(false);
        final AtomicBoolean initComplete = new AtomicBoolean(false);
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition eventCondition = this.lock.newCondition();
        private final List<VertexStateUpdate> stateUpdates = new LinkedList<VertexStateUpdate>();
        private int numExpectedVertexStateUpdate = 1;
        private Object waitForVertexStateUpdate = new Object();
        private final List<InputInitializerEvent> initializerEvents = new LinkedList<InputInitializerEvent>();
        private volatile InputInitializerContext context;
        private volatile int numExpectedEvents = 1;
        private IIExceptionLocation exLocation = null;

        public EventHandlingRootInputInitializer(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        public EventHandlingRootInputInitializer(InputInitializerContext initializerContext, IIExceptionLocation exLocation) {
            super(initializerContext);
            this.exLocation = exLocation;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<org.apache.tez.runtime.api.Event> initialize() throws Exception {
            this.context.registerForVertexStateUpdates("vertex1", null);
            this.initStarted.set(true);
            if (this.exLocation == IIExceptionLocation.Initialize) {
                throw new Exception(this.exLocation.name());
            }
            this.lock.lock();
            try {
                if (!this.eventReceived.get()) {
                    this.eventCondition.await();
                }
            }
            finally {
                this.lock.unlock();
            }
            this.initComplete.set(true);
            InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload((int)0, (ByteBuffer)ByteBuffer.wrap(new byte[]{0}));
            LinkedList<org.apache.tez.runtime.api.Event> eventList = new LinkedList<org.apache.tez.runtime.api.Event>();
            eventList.add((org.apache.tez.runtime.api.Event)diEvent);
            return eventList;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
            if (this.exLocation == IIExceptionLocation.HandleInputInitializerEvent) {
                throw new Exception(this.exLocation.name());
            }
            this.initializerEvents.addAll(events);
            if (this.initializerEvents.size() == this.numExpectedEvents) {
                this.eventReceived.set(true);
                this.lock.lock();
                try {
                    this.eventCondition.signal();
                }
                finally {
                    this.lock.unlock();
                }
            }
        }

        @Override
        public void setContext(InputInitializerContext context) {
            this.context = context;
        }

        public void setNumExpectedEvents(int numEvents) {
            this.numExpectedEvents = numEvents;
        }

        public void setNumVertexStateUpdateEvents(int numEvents) {
            this.numExpectedVertexStateUpdate = numEvents;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
            if (this.exLocation == IIExceptionLocation.OnVertexStateUpdated) {
                throw new RuntimeException(this.exLocation.name());
            }
            this.stateUpdates.add(stateUpdate);
            if (this.stateUpdates.size() == this.numExpectedVertexStateUpdate) {
                Object object = this.waitForVertexStateUpdate;
                synchronized (object) {
                    this.waitForVertexStateUpdate.notify();
                }
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void waitForVertexStateUpdate() throws InterruptedException {
            if (this.stateUpdates.size() < this.numExpectedVertexStateUpdate) {
                Object object = this.waitForVertexStateUpdate;
                synchronized (object) {
                    this.waitForVertexStateUpdate.wait();
                }
            }
        }
    }

    public static enum IIExceptionLocation {
        Initialize,
        HandleInputInitializerEvent,
        OnVertexStateUpdated;

    }

    @InterfaceAudience.Private
    public static class VertexManagerWithException
    extends RootInputVertexManager {
        private VMExceptionLocation exLocation;

        public VertexManagerWithException(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
            super.initialize();
            this.exLocation = VMExceptionLocation.valueOf(new String(this.getContext().getUserPayload().deepCopyAsArray()));
            if (this.exLocation == VMExceptionLocation.Initialize) {
                throw new RuntimeException(this.exLocation.name());
            }
            if (this.exLocation == VMExceptionLocation.NoExceptionDoReconfigure) {
                this.getContext().vertexReconfigurationPlanned();
            }
        }

        public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<org.apache.tez.runtime.api.Event> events) {
            if (this.exLocation == VMExceptionLocation.OnRootVertexInitialized) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onRootVertexInitialized(inputName, inputDescriptor, events);
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
            if (this.exLocation == VMExceptionLocation.OnSourceTaskCompleted) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onSourceTaskCompleted(attempt);
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
            if (this.exLocation == VMExceptionLocation.OnVertexStarted) {
                throw new RuntimeException(this.exLocation.name());
            }
            super.onVertexStarted(completions);
            if (this.exLocation == VMExceptionLocation.NoExceptionDoReconfigure) {
                this.getContext().doneReconfiguringVertex();
            }
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
            super.onVertexManagerEventReceived(vmEvent);
            if (this.exLocation == VMExceptionLocation.OnVertexManagerEventReceived) {
                throw new RuntimeException(this.exLocation.name());
            }
        }

        public void onVertexStateUpdated(VertexStateUpdate stateUpdate) {
            super.onVertexStateUpdated(stateUpdate);
            if (this.exLocation == VMExceptionLocation.OnVertexManagerVertexStateUpdated) {
                throw new RuntimeException(this.exLocation.name());
            }
        }

        public static enum VMExceptionLocation {
            NoExceptionDoReconfigure,
            OnRootVertexInitialized,
            OnSourceTaskCompleted,
            OnVertexStarted,
            OnVertexManagerEventReceived,
            OnVertexManagerVertexStateUpdated,
            Initialize;

        }
    }

    public static class InvocationCountingVertexManager
    extends VertexManagerPlugin {
        static final AtomicInteger numVmEventsReceived = new AtomicInteger(0);
        static final AtomicInteger numInitializedInputs = new AtomicInteger(0);

        public InvocationCountingVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() throws Exception {
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
            numVmEventsReceived.incrementAndGet();
        }

        public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<org.apache.tez.runtime.api.Event> events) throws Exception {
            numInitializedInputs.incrementAndGet();
        }
    }

    @InterfaceAudience.Private
    public static class RootInitializerSettingParallelismTo0
    extends InputInitializer {
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition condition = this.lock.newCondition();

        public RootInitializerSettingParallelismTo0(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public List<org.apache.tez.runtime.api.Event> initialize() throws Exception {
            InputConfigureVertexTasksEvent event = InputConfigureVertexTasksEvent.create((int)0, null, (InputSpecUpdate)InputSpecUpdate.getDefaultSinglePhysicalInputSpecUpdate());
            LinkedList<org.apache.tez.runtime.api.Event> events = new LinkedList<org.apache.tez.runtime.api.Event>();
            events.add((org.apache.tez.runtime.api.Event)event);
            this.lock.lock();
            try {
                this.condition.await();
            }
            finally {
                this.lock.unlock();
            }
            LOG.info("Received signal to proceed. Returning event to set parallelism to 0");
            return events;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void go() {
            this.lock.lock();
            try {
                LOG.info("Signallying initializer to proceed");
                this.condition.signal();
            }
            finally {
                this.lock.unlock();
            }
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
        }
    }

    @InterfaceAudience.Private
    public static class RootInputSpecUpdaterVertexManager
    extends VertexManagerPlugin {
        private static final int NUM_TASKS = 5;

        public RootInputSpecUpdaterVertexManager(VertexManagerPluginContext context) {
            super(context);
        }

        public void initialize() {
        }

        public void onVertexStarted(List<TaskAttemptIdentifier> completions) {
        }

        public void onSourceTaskCompleted(TaskAttemptIdentifier attempt) {
        }

        public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
        }

        public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, List<org.apache.tez.runtime.api.Event> events) {
            HashMap<String, InputSpecUpdate> map = new HashMap<String, InputSpecUpdate>();
            if (this.getContext().getUserPayload().deepCopyAsArray()[0] == 0) {
                map.put("input3", InputSpecUpdate.createAllTaskInputSpecUpdate((int)4));
            } else {
                LinkedList<Integer> pInputList = new LinkedList<Integer>();
                for (int i = 1; i <= 5; ++i) {
                    pInputList.add(i);
                }
                map.put("input4", InputSpecUpdate.createPerTaskInputSpecUpdate(pInputList));
            }
            this.getContext().reconfigureVertex(map, null, 5);
        }
    }

    private static class RootInputInitializerManagerControlled
    extends RootInputInitializerManager {
        private List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs;
        private final EventHandler eventHandler;
        private final DrainDispatcher dispatcher;
        private final TezVertexID vertexID;
        private volatile boolean hasShutDown = false;

        public RootInputInitializerManagerControlled(Vertex vertex, AppContext appContext, EventHandler eventHandler, DrainDispatcher dispatcher, StateChangeNotifier tracker) throws IOException {
            super(vertex, appContext, UserGroupInformation.getCurrentUser(), tracker);
            this.eventHandler = eventHandler;
            this.dispatcher = dispatcher;
            this.vertexID = vertex.getVertexId();
        }

        public void runInputInitializers(List<RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor>> inputs) {
            this.inputs = inputs;
        }

        protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input, InputInitializerContext context) {
            return new InputInitializer(context){

                public List<org.apache.tez.runtime.api.Event> initialize() throws Exception {
                    return null;
                }

                public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
                }
            };
        }

        public void shutdown() {
            this.hasShutDown = true;
        }

        public void failInputInitialization() throws TezException {
            super.runInputInitializers(this.inputs);
            this.eventHandler.handle((Event)new VertexEventRootInputFailed(this.vertexID, this.inputs.get(0).getName(), new AMUserCodeException(AMUserCodeException.Source.InputInitializer, (Throwable)new RuntimeException("MockInitializerFailed"))));
            this.dispatcher.await();
        }

        public void completeInputInitialization() {
            this.eventHandler.handle((Event)new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(0).getName(), null));
            this.dispatcher.await();
        }

        public void completeInputDistribution(byte[] payload) {
            ArrayList events = Lists.newArrayListWithCapacity((int)1);
            InputUpdatePayloadEvent event = InputUpdatePayloadEvent.create((ByteBuffer)ByteBuffer.wrap(payload));
            events.add(event);
            this.eventHandler.handle((Event)new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(0).getName(), (List)events));
            this.dispatcher.await();
        }

        public void completeInputInitialization(int initializerIndex) {
            this.eventHandler.handle((Event)new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(initializerIndex).getName(), null));
            this.dispatcher.await();
        }

        public void completeInputInitialization(int initializerIndex, int targetTasks, List<TaskLocationHint> locationHints) {
            ArrayList events = Lists.newArrayListWithCapacity((int)(targetTasks + 1));
            InputConfigureVertexTasksEvent configEvent = InputConfigureVertexTasksEvent.create((int)targetTasks, (VertexLocationHint)VertexLocationHint.create(locationHints), null);
            events.add(configEvent);
            for (int i = 0; i < targetTasks; ++i) {
                InputDataInformationEvent diEvent = InputDataInformationEvent.createWithSerializedPayload((int)i, null);
                events.add(diEvent);
            }
            this.eventHandler.handle((Event)new VertexEventRootInputInitialized(this.vertexID, this.inputs.get(initializerIndex).getName(), (List)events));
            this.dispatcher.await();
        }
    }

    private static class RootInputInitializerManagerWithRunningInitializer
    extends RootInputInitializerManager {
        private final InputInitializer presetInitializer;

        public RootInputInitializerManagerWithRunningInitializer(Vertex vertex, AppContext appContext, InputInitializer presetInitializer, StateChangeNotifier tracker) throws IOException {
            super(vertex, appContext, UserGroupInformation.getCurrentUser(), tracker);
            this.presetInitializer = presetInitializer;
        }

        protected InputInitializer createInitializer(RootInputLeafOutput<InputDescriptor, InputInitializerDescriptor> input, InputInitializerContext context) {
            if (this.presetInitializer instanceof ContextSettableInputInitialzier) {
                ((ContextSettableInputInitialzier)this.presetInitializer).setContext(context);
            }
            return this.presetInitializer;
        }
    }

    private static class VertexImplWithControlledInitializerManager
    extends VertexImpl {
        private final DrainDispatcher dispatcher;
        private RootInputInitializerManagerControlled rootInputInitializerManager;

        public VertexImplWithControlledInitializerManager(TezVertexID vertexId, DAGProtos.VertexPlan vertexPlan, String vertexName, Configuration conf, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher, StateChangeNotifier updateTracker, Configuration dagConf) {
            super(vertexId, vertexPlan, vertexName, conf, eventHandler, taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, updateTracker, dagConf);
            this.dispatcher = dispatcher;
        }

        protected RootInputInitializerManager createRootInputInitializerManager(String dagName, String vertexName, TezVertexID vertexID, EventHandler eventHandler, int numTasks, int numNodes, Resource taskResource, Resource totalResource) {
            try {
                this.rootInputInitializerManager = new RootInputInitializerManagerControlled((Vertex)this, this.getAppContext(), eventHandler, this.dispatcher, this.stateChangeNotifier);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return this.rootInputInitializerManager;
        }

        RootInputInitializerManagerControlled getRootInputInitializerManager() {
            return this.rootInputInitializerManager;
        }
    }

    private static class VertexImplWithRunningInputInitializer
    extends VertexImpl {
        private RootInputInitializerManagerWithRunningInitializer rootInputInitializerManager;
        private final InputInitializer presetInitializer;

        public VertexImplWithRunningInputInitializer(TezVertexID vertexId, DAGProtos.VertexPlan vertexPlan, String vertexName, Configuration conf, EventHandler eventHandler, TaskCommunicatorManagerInterface taskCommunicatorManagerInterface, Clock clock, TaskHeartbeatHandler thh, AppContext appContext, VertexLocationHint vertexLocationHint, DrainDispatcher dispatcher, InputInitializer presetInitializer, StateChangeNotifier updateTracker, Configuration dagConf) {
            super(vertexId, vertexPlan, vertexName, conf, eventHandler, taskCommunicatorManagerInterface, clock, thh, true, appContext, vertexLocationHint, null, taskSpecificLaunchCmdOption, updateTracker, dagConf);
            this.presetInitializer = presetInitializer;
        }

        protected RootInputInitializerManager createRootInputInitializerManager(String dagName, String vertexName, TezVertexID vertexID, EventHandler eventHandler, int numTasks, int numNodes, Resource taskResource, Resource totalResource) {
            try {
                this.rootInputInitializerManager = new RootInputInitializerManagerWithRunningInitializer((Vertex)this, this.getAppContext(), this.presetInitializer, this.stateChangeNotifier);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            return this.rootInputInitializerManager;
        }
    }

    class TestUpdateListener
    implements VertexStateUpdateListener {
        List<VertexStateUpdate> events = Lists.newLinkedList();

        TestUpdateListener() {
        }

        public void onStateUpdated(VertexStateUpdate event) {
            this.events.add(event);
        }
    }

    private class VertexEventDispatcher
    implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent event) {
            VertexImpl vertex = (VertexImpl)TestVertexImpl.this.vertexIdMap.get(event.getVertexId());
            vertex.handle((Event)event);
        }
    }

    private class DagEventDispatcher
    implements EventHandler<DAGEvent> {
        public Map<DAGEventType, Integer> eventCount = new HashMap<DAGEventType, Integer>();

        private DagEventDispatcher() {
        }

        public void handle(DAGEvent event) {
            int count = 1;
            if (this.eventCount.containsKey(event.getType())) {
                count = this.eventCount.get(event.getType()) + 1;
            }
            this.eventCount.put((DAGEventType)event.getType(), count);
        }
    }

    private class TaskEventDispatcher
    implements EventHandler<TaskEvent> {
        List<TaskEvent> events = Lists.newArrayList();

        private TaskEventDispatcher() {
        }

        public void handle(TaskEvent event) {
            this.events.add(event);
            VertexImpl vertex = (VertexImpl)TestVertexImpl.this.vertexIdMap.get(event.getTaskID().getVertexID());
            Task task = vertex.getTask(event.getTaskID());
            if (task != null) {
                ((EventHandler)task).handle((Event)event);
            } else {
                LOG.warn("Task null for vertex: " + vertex.getName() + " taskId: " + event.getTaskID() + ". Please check if this is important for the test");
            }
        }
    }

    private class TaskAttemptEventDispatcher
    implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent event) {
            VertexImpl vertex = (VertexImpl)TestVertexImpl.this.vertexIdMap.get(event.getTaskAttemptID().getTaskID().getVertexID());
            Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
            ((EventHandler)task.getAttempt(event.getTaskAttemptID())).handle((Event)event);
        }
    }

    public static class CountingOutputCommitter
    extends OutputCommitter {
        public int initCounter = 0;
        public int setupCounter = 0;
        public int commitCounter = 0;
        public int abortCounter = 0;
        private boolean throwError = false;
        private boolean throwErrorOnAbort = false;
        private boolean throwRuntimeException = false;

        public CountingOutputCommitter(OutputCommitterContext context) {
            super(context);
        }

        public void initialize() throws IOException {
            if (this.getContext().getUserPayload() != null && this.getContext().getUserPayload().hasPayload()) {
                CountingOutputCommitterConfig conf = new CountingOutputCommitterConfig(this.getContext().getUserPayload());
                this.throwError = conf.throwError;
                this.throwErrorOnAbort = conf.throwErrorOnAbort;
                this.throwRuntimeException = conf.throwRuntimeException;
            }
            ++this.initCounter;
        }

        public void setupOutput() throws IOException {
            ++this.setupCounter;
        }

        public void commitOutput() throws IOException {
            ++this.commitCounter;
            if (this.throwError) {
                if (!this.throwRuntimeException) {
                    throw new IOException("I can throwz exceptions in commit");
                }
                throw new RuntimeException("I can throwz exceptions in commit");
            }
        }

        public void abortOutput(VertexStatus.State finalState) throws IOException {
            ++this.abortCounter;
            if (this.throwErrorOnAbort) {
                if (!this.throwRuntimeException) {
                    throw new IOException("I can throwz exceptions in abort");
                }
                throw new RuntimeException("I can throwz exceptions in abort");
            }
        }

        public static class CountingOutputCommitterConfig
        implements Writable {
            boolean throwError = false;
            boolean throwErrorOnAbort = false;
            boolean throwRuntimeException = false;

            public CountingOutputCommitterConfig() {
            }

            public CountingOutputCommitterConfig(boolean throwError, boolean throwErrorOnAbort, boolean throwRuntimeException) {
                this.throwError = throwError;
                this.throwErrorOnAbort = throwErrorOnAbort;
                this.throwRuntimeException = throwRuntimeException;
            }

            public CountingOutputCommitterConfig(UserPayload payload) throws IOException {
                DataInputByteBuffer in = new DataInputByteBuffer();
                in.reset(new ByteBuffer[]{payload.getPayload()});
                this.readFields((DataInput)in);
            }

            public void write(DataOutput out) throws IOException {
                out.writeBoolean(this.throwError);
                out.writeBoolean(this.throwErrorOnAbort);
                out.writeBoolean(this.throwRuntimeException);
            }

            public void readFields(DataInput in) throws IOException {
                this.throwError = in.readBoolean();
                this.throwErrorOnAbort = in.readBoolean();
                this.throwRuntimeException = in.readBoolean();
            }

            public byte[] toUserPayload() throws IOException {
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                DataOutputStream out = new DataOutputStream(bos);
                this.write(out);
                return bos.toByteArray();
            }
        }
    }
}

