/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class TestDAGImpl {
    private static final Log LOG = LogFactory.getLog(TestDAGImpl.class);
    private DAGProtos.DAGPlan dagPlan;
    private TezDAGID dagId;
    private static Configuration conf;
    private DrainDispatcher dispatcher;
    private Credentials fsTokens;
    private AppContext appContext;
    private ACLManager aclManager;
    private ApplicationAttemptId appAttemptId;
    private DAGImpl dag;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private TaskAttemptListener taskAttemptListener;
    private TaskHeartbeatHandler thh;
    private Clock clock = new SystemClock();
    private DAGFinishEventHandler dagFinishEventHandler;
    private AppContext mrrAppContext;
    private DAGProtos.DAGPlan mrrDagPlan;
    private DAGImpl mrrDag;
    private TezDAGID mrrDagId;
    private AppContext groupAppContext;
    private DAGProtos.DAGPlan groupDagPlan;
    private DAGImpl groupDag;
    private TezDAGID groupDagId;
    private DAGProtos.DAGPlan dagPlanWithCustomEdge;
    private DAGImpl dagWithCustomEdge;
    private TezDAGID dagWithCustomEdgeId;
    private AppContext dagWithCustomEdgeAppContext;
    private HistoryEventHandler historyEventHandler;
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;

    private DAGImpl chooseDAG(TezDAGID curDAGId) {
        if (curDAGId.equals((Object)this.dagId)) {
            return this.dag;
        }
        if (curDAGId.equals((Object)this.mrrDagId)) {
            return this.mrrDag;
        }
        if (curDAGId.equals((Object)this.groupDagId)) {
            return this.groupDag;
        }
        if (curDAGId.equals((Object)this.dagWithCustomEdgeId)) {
            return this.dagWithCustomEdge;
        }
        throw new RuntimeException("Invalid event, unknown dag, dagId=" + curDAGId);
    }

    private DAGProtos.DAGPlan createTestMRRDAGPlan() {
        LOG.info((Object)"Setting up MRR dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output1").build()).setName("output1").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output2").build()).setName("output2").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addInEdgeId("e1").addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output3").build()).setName("output3").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    static DAGProtos.DAGPlan createGroupDAGPlan() {
        LOG.info((Object)"Setting up group dag plan");
        int dummyTaskCount = 1;
        Resource dummyTaskResource = Resource.newInstance((int)1, (int)1);
        org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create((String)"vertex1", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create((String)"vertex2", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create((String)"vertex3", (ProcessorDescriptor)ProcessorDescriptor.create((String)"Processor"), (int)dummyTaskCount, (Resource)dummyTaskResource);
        DAG dag = DAG.create((String)"testDag");
        String groupName1 = "uv12";
        OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create((String)TotalCountingOutputCommitter.class.getName());
        VertexGroup uv12 = dag.createVertexGroup(groupName1, new org.apache.tez.dag.api.Vertex[]{v1, v2});
        OutputDescriptor outDesc = OutputDescriptor.create((String)"output.class");
        uv12.addDataSink("uvOut", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd, null));
        v3.addDataSink("uvOut", DataSinkDescriptor.create((OutputDescriptor)outDesc, (OutputCommitterDescriptor)ocd, null));
        GroupInputEdge e1 = GroupInputEdge.create((VertexGroup)uv12, (org.apache.tez.dag.api.Vertex)v3, (EdgeProperty)EdgeProperty.create((EdgeProperty.DataMovementType)EdgeProperty.DataMovementType.SCATTER_GATHER, (EdgeProperty.DataSourceType)EdgeProperty.DataSourceType.PERSISTED, (EdgeProperty.SchedulingType)EdgeProperty.SchedulingType.SEQUENTIAL, (OutputDescriptor)OutputDescriptor.create((String)"dummy output class"), (InputDescriptor)InputDescriptor.create((String)"dummy input class")), (InputDescriptor)InputDescriptor.create((String)"merge.class"));
        dag.addVertex(v1);
        dag.addVertex(v2);
        dag.addVertex(v3);
        dag.addEdge(e1);
        return dag.createDag(conf, null, null, null, true);
    }

    public static DAGProtos.DAGPlan createTestDAGPlan() {
        LOG.info((Object)"Setting up dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addInEdgeId("e1").addInEdgeId("e2").addOutEdgeId("e3").addOutEdgeId("e4").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex4").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host4").addRack("rack4").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x4.y4").build()).addInEdgeId("e3").addOutEdgeId("e5").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex5").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host5").addRack("rack5").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x5.y5").build()).addInEdgeId("e4").addOutEdgeId("e6").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex6").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host6").addRack("rack6").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x6.y6").build()).addInEdgeId("e5").addInEdgeId("e6").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v1")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3_v2")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i4_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v4")).setOutputVertexName("vertex4").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e3").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i5_v3")).setInputVertexName("vertex3").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o3_v5")).setOutputVertexName("vertex5").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e4").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v4")).setInputVertexName("vertex4").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o4")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e5").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i6_v5")).setInputVertexName("vertex5").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o5")).setOutputVertexName("vertex6").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e6").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    private DAGProtos.DAGPlan createDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation exLocation) {
        LOG.info((Object)"Setting up dag plan");
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x2.y2")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(2).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x2.y2").build()).addInEdgeId("e1").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeManager(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(CustomizedEdgeManager.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFromUtf8((String)exLocation.name())))).setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("v1_v2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex2").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.CUSTOM).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    @BeforeClass
    public static void beforeClass() {
        MockDNSToSwitchMapping.initializeMockRackResolver();
    }

    @Before
    public void setup() {
        conf = new Configuration();
        conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        this.dagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)1);
        Assert.assertNotNull((Object)this.dagId);
        this.dagPlan = TestDAGImpl.createTestDAGPlan();
        this.dispatcher = new DrainDispatcher();
        this.fsTokens = new Credentials();
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        this.historyEventHandler = (HistoryEventHandler)Mockito.mock(HistoryEventHandler.class);
        this.aclManager = new ACLManager("amUser");
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.appContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.appContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.appContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.dagId).when((Object)this.appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.appContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.appContext)).getAMACLManager();
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskAttemptListener, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        ((AppContext)Mockito.doReturn((Object)this.dag).when((Object)this.appContext)).getCurrentDAG();
        this.mrrAppContext = (AppContext)Mockito.mock(AppContext.class);
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.mrrAppContext)).getAMACLManager();
        this.mrrDagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)2);
        this.mrrDagPlan = this.createTestMRRDAGPlan();
        this.mrrDag = new DAGImpl(this.mrrDagId, conf, this.mrrDagPlan, this.dispatcher.getEventHandler(), this.taskAttemptListener, this.fsTokens, this.clock, "user", this.thh, this.mrrAppContext);
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.mrrAppContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.mrrDag).when((Object)this.mrrAppContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.mrrAppContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.mrrAppContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.mrrAppContext)).getHistoryHandler();
        this.groupAppContext = (AppContext)Mockito.mock(AppContext.class);
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.groupAppContext)).getAMACLManager();
        this.groupDagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)3);
        this.groupDagPlan = TestDAGImpl.createGroupDAGPlan();
        this.groupDag = new DAGImpl(this.groupDagId, conf, this.groupDagPlan, this.dispatcher.getEventHandler(), this.taskAttemptListener, this.fsTokens, this.clock, "user", this.thh, this.groupAppContext);
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.groupAppContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.groupDag).when((Object)this.groupAppContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.groupAppContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.groupAppContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.groupAppContext)).getHistoryHandler();
        TotalCountingOutputCommitter.totalCommitCounter = 0;
        this.taskEventDispatcher = new TaskEventDispatcher();
        this.dispatcher.register(TaskEventType.class, (EventHandler)this.taskEventDispatcher);
        this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)this.taskAttemptEventDispatcher);
        this.vertexEventDispatcher = new VertexEventDispatcher();
        this.dispatcher.register(VertexEventType.class, (EventHandler)this.vertexEventDispatcher);
        this.dagEventDispatcher = new DagEventDispatcher();
        this.dispatcher.register(DAGEventType.class, (EventHandler)this.dagEventDispatcher);
        this.dagFinishEventHandler = new DAGFinishEventHandler();
        this.dispatcher.register(DAGAppMasterEventType.class, (EventHandler)this.dagFinishEventHandler);
        this.dispatcher.register(TaskEventType.class, (EventHandler)new TaskEventHandler());
        this.dispatcher.init(conf);
        this.dispatcher.start();
    }

    @After
    public void teardown() {
        this.dispatcher.await();
        this.dispatcher.stop();
        this.dagPlan = null;
        this.dag = null;
    }

    private void setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation exLocation) {
        this.dagWithCustomEdgeId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)4);
        this.dagPlanWithCustomEdge = this.createDAGWithCustomEdge(exLocation);
        this.dagWithCustomEdgeAppContext = (AppContext)Mockito.mock(AppContext.class);
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.dagWithCustomEdgeAppContext)).getAMACLManager();
        this.dagWithCustomEdge = new DAGImpl(this.dagWithCustomEdgeId, conf, this.dagPlanWithCustomEdge, this.dispatcher.getEventHandler(), this.taskAttemptListener, this.fsTokens, this.clock, "user", this.thh, this.dagWithCustomEdgeAppContext);
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.dagWithCustomEdgeAppContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.dagWithCustomEdge).when((Object)this.dagWithCustomEdgeAppContext)).getCurrentDAG();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.dagWithCustomEdgeAppContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.dagWithCustomEdgeAppContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.historyEventHandler).when((Object)this.dagWithCustomEdgeAppContext)).getHistoryHandler();
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)new TaskAttemptEventDisptacher2());
        this.dispatcher.register(AMSchedulerEventType.class, (EventHandler)new AMSchedulerEventHandler());
    }

    private void initDAG(DAGImpl impl) {
        impl.handle(new DAGEvent(impl.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals((Object)DAGState.INITED, (Object)impl.getState());
    }

    private void startDAG(DAGImpl impl) {
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(impl.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)impl.getState());
    }

    @Test(timeout=5000L)
    public void testDAGInit() {
        this.initDAG(this.dag);
        Assert.assertEquals((long)6L, (long)this.dag.getTotalVertices());
    }

    @Test(timeout=5000L)
    public void testDAGInitFailed() {
        this.setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation.Initialize);
        this.dagWithCustomEdge.handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dagWithCustomEdge.getState());
        this.dagWithCustomEdge.handle(new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_START));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dagWithCustomEdge.getState());
    }

    @Test(timeout=5000L)
    public void testDAGStart() {
        TezVertexID vId;
        int i;
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        for (i = 0; i < 6; ++i) {
            vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
            if (i < 2) {
                Assert.assertEquals((long)0L, (long)v.getDistanceFromRoot());
                continue;
            }
            if (i == 2) {
                Assert.assertEquals((long)1L, (long)v.getDistanceFromRoot());
                continue;
            }
            if (i > 2 && i < 5) {
                Assert.assertEquals((long)2L, (long)v.getDistanceFromRoot());
                continue;
            }
            if (i != 5) continue;
            Assert.assertEquals((long)3L, (long)v.getDistanceFromRoot());
        }
        for (i = 0; i < 6; ++i) {
            vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            LOG.info((Object)("Distance from root: v" + i + ":" + this.dag.getVertex(vId).getDistanceFromRoot()));
        }
    }

    @Test(timeout=5000L)
    public void testVertexCompletion() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)1);
        Vertex v = this.dag.getVertex(vId);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
    }

    @Test(timeout=5000L)
    public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() {
        this.setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation.GetNumDestinationTaskPhysicalInputs);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        LOG.info((Object)v2.getTasks().size());
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskId(), (int)0));
        Assert.assertEquals((Object)TaskAttemptStateInternal.FAILED, (Object)ta1.getInternalState());
        String diag = StringUtils.join((Collection)ta1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(CustomizedEdgeManager.ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() {
        this.setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation.GetNumSourceTaskPhysicalOutputs);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        Task t1 = v1.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskId(), (int)0));
        Assert.assertEquals((Object)TaskAttemptStateInternal.FAILED, (Object)ta1.getInternalState());
        String diag = StringUtils.join((Collection)ta1.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(CustomizedEdgeManager.ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_RouteDataMovementEventToDestination() {
        this.setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation.RouteDataMovementEventToDestination);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskId(), (int)0));
        DataMovementEvent daEvent = DataMovementEvent.create((ByteBuffer)ByteBuffer.wrap(new byte[0]));
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)daEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(CustomizedEdgeManager.ExceptionLocation.RouteDataMovementEventToDestination.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_RouteInputSourceTaskFailedEventToDestination() {
        this.setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation.RouteInputSourceTaskFailedEventToDestination);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskId(), (int)0));
        InputFailedEvent ifEvent = InputFailedEvent.create((int)0, (int)1);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)ifEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(CustomizedEdgeManager.ExceptionLocation.RouteInputSourceTaskFailedEventToDestination.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_GetNumDestinationConsumerTasks() {
        this.setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation.GetNumDestinationConsumerTasks);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskId(), (int)0));
        InputReadErrorEvent ireEvent = InputReadErrorEvent.create((String)"", (int)0, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)ireEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "vertex1", ta1.getID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(CustomizedEdgeManager.ExceptionLocation.GetNumDestinationConsumerTasks.name()));
    }

    @Test(timeout=5000L)
    public void testEdgeManager_RouteInputErrorEventToSource() {
        this.setupDAGWithCustomEdge(CustomizedEdgeManager.ExceptionLocation.RouteInputErrorEventToSource);
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagWithCustomEdge.getID(), null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dagWithCustomEdge.getState());
        VertexImpl v1 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dagWithCustomEdge.getVertex("vertex2");
        this.dispatcher.await();
        Task t1 = v2.getTask(0);
        TaskAttemptImpl ta1 = (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance((TezTaskID)t1.getTaskId(), (int)0));
        InputReadErrorEvent ireEvent = InputReadErrorEvent.create((String)"", (int)0, (int)0);
        TezEvent tezEvent = new TezEvent((org.apache.tez.runtime.api.Event)ireEvent, new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex2", "vertex1", ta1.getID()));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventRouteEvent(v2.getVertexId(), (List)Lists.newArrayList((Object[])new TezEvent[]{tezEvent})));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.FAILED, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)v1.getState());
        String diag = StringUtils.join((Collection)v2.getDiagnostics(), (String)",");
        Assert.assertTrue((boolean)diag.contains(CustomizedEdgeManager.ExceptionLocation.RouteInputErrorEventToSource.name()));
    }

    @Test(timeout=5000L)
    public void testGroupDAGCompletionWithCommitSuccess() {
        this.initDAG(this.groupDag);
        this.startDAG(this.groupDag);
        this.dispatcher.await();
        for (int i = 0; i < 3; ++i) {
            Vertex v = this.groupDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.groupDag.getSuccessfulVertices());
        }
        Assert.assertEquals((long)3L, (long)this.groupDag.getSuccessfulVertices());
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)this.groupDag.getState());
        Assert.assertEquals((long)2L, (long)TotalCountingOutputCommitter.totalCommitCounter);
    }

    @Test(timeout=5000L)
    public void testGroupDAGWithVertexReRunning() {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.initDAG(this.groupDag);
        this.startDAG(this.groupDag);
        this.dispatcher.await();
        Vertex v1 = this.groupDag.getVertex("vertex1");
        Vertex v2 = this.groupDag.getVertex("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexReRunning(v1.getVertexId()));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((long)0L, (long)TotalCountingOutputCommitter.totalCommitCounter);
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)TotalCountingOutputCommitter.totalCommitCounter);
        Assert.assertEquals((long)2L, (long)this.groupDag.getSuccessfulVertices());
    }

    @Test(timeout=5000L)
    public void testGroupDAGWithVertexReRunningAfterCommit() {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.initDAG(this.groupDag);
        this.startDAG(this.groupDag);
        this.dispatcher.await();
        Vertex v1 = this.groupDag.getVertex("vertex1");
        Vertex v2 = this.groupDag.getVertex("vertex2");
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((long)1L, (long)TotalCountingOutputCommitter.totalCommitCounter);
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexReRunning(v1.getVertexId()));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.groupDag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, (Object)this.groupDag.getTerminationCause());
    }

    @Test(timeout=5000L)
    public void testDAGCompletionWithCommitSuccess() {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
        }
        for (Vertex v : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex v = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)3L, (long)this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)this.mrrDag.getState());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testDAGCompletionWithCommitFailure() throws IOException {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        Vertex badVertex = this.mrrDag.getVertex("vertex3");
        ArrayList<DAGProtos.RootInputLeafOutputProto> outputs = new ArrayList<DAGProtos.RootInputLeafOutputProto>();
        outputs.add(DAGProtos.RootInputLeafOutputProto.newBuilder().setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()).setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder().setUserPayload(ByteString.copyFrom((byte[])new TestVertexImpl.CountingOutputCommitter.CountingOutputCommitterConfig(true, false, false).toUserPayload())).build())).setName("output3").setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output.class")).build());
        badVertex.setAdditionalOutputs(outputs);
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
        }
        for (Vertex v : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex v = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)3L, (long)this.mrrDag.getSuccessfulVertices());
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.mrrDag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.COMMIT_FAILURE, (Object)this.mrrDag.getTerminationCause());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)1L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testDAGErrorAbortAllOutputs() {
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
        }
        for (Vertex v : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex v = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(v.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.ERROR, (Object)v.getState());
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.mrrDag.getState());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)1L, (long)committer.abortCounter);
                Assert.assertEquals((long)0L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testDAGErrorAbortNonSuccessfulOutputs() {
        conf.setBoolean("tez.am.commit-all-outputs-on-dag-success", false);
        this.initDAG(this.mrrDag);
        this.dispatcher.await();
        this.startDAG(this.mrrDag);
        this.dispatcher.await();
        for (int i = 0; i < 2; ++i) {
            Vertex v = this.mrrDag.getVertex("vertex" + (i + 1));
            this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)v.getVertexId(), (int)0), TaskState.SUCCEEDED));
            this.dispatcher.await();
            Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
            Assert.assertEquals((long)(i + 1), (long)this.mrrDag.getSuccessfulVertices());
            for (OutputCommitter c : v.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
        Vertex errorVertex = this.mrrDag.getVertex("vertex3");
        this.dispatcher.getEventHandler().handle((Event)new VertexEvent(errorVertex.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.ERROR, (Object)errorVertex.getState());
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.mrrDag.getState());
        for (Vertex vertex : this.mrrDag.vertices.values()) {
            for (OutputCommitter c : vertex.getOutputCommitters().values()) {
                TestVertexImpl.CountingOutputCommitter committer = (TestVertexImpl.CountingOutputCommitter)c;
                if (vertex == errorVertex) {
                    Assert.assertEquals((long)1L, (long)committer.abortCounter);
                    Assert.assertEquals((long)0L, (long)committer.commitCounter);
                    Assert.assertEquals((long)1L, (long)committer.initCounter);
                    Assert.assertEquals((long)1L, (long)committer.setupCounter);
                    continue;
                }
                Assert.assertEquals((long)0L, (long)committer.abortCounter);
                Assert.assertEquals((long)1L, (long)committer.commitCounter);
                Assert.assertEquals((long)1L, (long)committer.initCounter);
                Assert.assertEquals((long)1L, (long)committer.setupCounter);
            }
        }
    }

    @Test(timeout=5000L)
    public void testVertexReRunning() {
        this.initDAG(this.dag);
        this.dag.dagScheduler = (DAGScheduler)Mockito.mock(DAGScheduler.class);
        this.startDAG(this.dag);
        this.dispatcher.await();
        TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)1);
        Vertex v = this.dag.getVertex(vId);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)1), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((long)1L, (long)this.dag.numCompletedVertices);
        ((DAGScheduler)Mockito.verify((Object)this.dag.dagScheduler, (VerificationMode)Mockito.times((int)1))).vertexCompleted(v);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskReschedule(TezTaskID.getInstance((TezVertexID)vId, (int)0)));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v.getState());
        Assert.assertEquals((long)0L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((long)0L, (long)this.dag.numCompletedVertices);
        this.dispatcher.getEventHandler().handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((long)1L, (long)this.dag.numCompletedVertices);
        ((DAGScheduler)Mockito.verify((Object)this.dag.dagScheduler, (VerificationMode)Mockito.times((int)1))).vertexCompleted(v);
    }

    public void testKillStartedDAG() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagId, DAGEventType.DAG_KILL));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.KILLED, (Object)this.dag.getState());
        for (int i = 0; i < 6; ++i) {
            TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        }
    }

    @Test(timeout=5000L)
    public void testKillRunningDAG() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        TezVertexID vId1 = TezVertexID.getInstance((TezDAGID)this.dagId, (int)1);
        Vertex v1 = this.dag.getVertex(vId1);
        ((EventHandler)v1).handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId1, (int)0), TaskState.SUCCEEDED));
        TezVertexID vId0 = TezVertexID.getInstance((TezDAGID)this.dagId, (int)0);
        Vertex v0 = this.dag.getVertex(vId0);
        ((EventHandler)v0).handle((Event)new VertexEventTaskCompleted(TezTaskID.getInstance((TezVertexID)vId0, (int)0), TaskState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v0.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagId, DAGEventType.DAG_KILL));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.TERMINATING, (Object)this.dag.getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)v0.getState());
        Assert.assertEquals((Object)VertexState.TERMINATING, (Object)v1.getState());
        for (int i = 2; i < 6; ++i) {
            TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        }
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
    }

    @Test(timeout=5000L)
    public void testInvalidEvent() {
        this.dispatcher.getEventHandler().handle((Event)new DAGEventStartDag(this.dagId, null));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.dag.getState());
    }

    @Test(timeout=5000L)
    @Ignore
    public void testVertexSuccessfulCompletionUpdates() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        for (int i = 0; i < 6; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((long)1L, (long)this.dag.getSuccessfulVertices());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)2), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)3), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)4), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)this.dag.getState());
        Assert.assertEquals((long)6L, (long)this.dag.getSuccessfulVertices());
    }

    @Test(timeout=5000L)
    public void testVertexFailureHandling() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)2), VertexState.FAILED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((long)2L, (long)this.dag.getSuccessfulVertices());
        for (int i = 3; i < 6; ++i) {
            TezVertexID vId = TezVertexID.getInstance((TezDAGID)this.dagId, (int)i);
            Vertex v = this.dag.getVertex(vId);
            Assert.assertEquals((Object)VertexState.KILLED, (Object)v.getState());
        }
    }

    @Test(timeout=5000L)
    public void testDAGKill() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagId, DAGEventType.DAG_KILL));
        for (int i = 2; i < 6; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.KILLED, (Object)this.dag.getState());
        Assert.assertEquals((Object)DAGTerminationCause.DAG_KILL, (Object)this.dag.getTerminationCause());
        Assert.assertEquals((long)6L, (long)this.dag.getSuccessfulVertices());
        for (Vertex v : this.dag.getVertices().values()) {
            Assert.assertEquals((Object)VertexTerminationCause.DAG_KILL, (Object)v.getTerminationCause());
        }
        Assert.assertEquals((long)1L, (long)this.dagFinishEventHandler.dagFinishEvents);
    }

    @Test(timeout=5000L)
    public void testDAGKillPending() {
        this.initDAG(this.dag);
        this.startDAG(this.dag);
        this.dispatcher.await();
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)0), VertexState.SUCCEEDED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)1), VertexState.SUCCEEDED));
        this.dispatcher.getEventHandler().handle((Event)new DAGEvent(this.dagId, DAGEventType.DAG_KILL));
        for (int i = 2; i < 5; ++i) {
            this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)i), VertexState.SUCCEEDED));
        }
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.KILLED, (Object)this.dag.getState());
        this.dispatcher.getEventHandler().handle((Event)new DAGEventVertexCompleted(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5), VertexState.KILLED));
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.KILLED, (Object)this.dag.getState());
        Assert.assertEquals((long)5L, (long)this.dag.getSuccessfulVertices());
        Assert.assertEquals((Object)this.dag.getVertex(TezVertexID.getInstance((TezDAGID)this.dagId, (int)5)).getTerminationCause(), (Object)VertexTerminationCause.DAG_KILL);
        Assert.assertEquals((long)1L, (long)this.dagFinishEventHandler.dagFinishEvents);
    }

    public static class CustomizedEdgeManager
    extends EdgeManagerPlugin {
        private ExceptionLocation exLocation;

        public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
            return (EdgeManagerPluginDescriptor)EdgeManagerPluginDescriptor.create((String)CustomizedEdgeManager.class.getName()).setUserPayload(UserPayload.create((ByteBuffer)ByteBuffer.wrap(exLocation.name().getBytes())));
        }

        public CustomizedEdgeManager(EdgeManagerPluginContext context) {
            super(context);
            this.exLocation = ExceptionLocation.valueOf(new String(context.getUserPayload().deepCopyAsArray()));
        }

        public void initialize() throws Exception {
            if (this.exLocation == ExceptionLocation.Initialize) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public void routeDataMovementEventToDestination(DataMovementEvent event, int sourceTaskIndex, int sourceOutputIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
        }

        public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex, Map<Integer, List<Integer>> destinationTaskAndInputIndices) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
                throw new Exception(this.exLocation.name());
            }
        }

        public int getNumDestinationConsumerTasks(int sourceTaskIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public int routeInputErrorEventToSource(InputReadErrorEvent event, int destinationTaskIndex, int destinationFailedInputIndex) throws Exception {
            if (this.exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
                throw new Exception(this.exLocation.name());
            }
            return 0;
        }

        public static enum ExceptionLocation {
            Initialize,
            GetNumDestinationTaskPhysicalInputs,
            GetNumSourceTaskPhysicalOutputs,
            RouteDataMovementEventToDestination,
            RouteInputSourceTaskFailedEventToDestination,
            GetNumDestinationConsumerTasks,
            RouteInputErrorEventToSource;

        }
    }

    private class AMSchedulerEventHandler
    implements EventHandler<AMSchedulerEvent> {
        private AMSchedulerEventHandler() {
        }

        public void handle(AMSchedulerEvent event) {
        }
    }

    public static class TotalCountingOutputCommitter
    extends TestVertexImpl.CountingOutputCommitter {
        static int totalCommitCounter = 0;

        public TotalCountingOutputCommitter(OutputCommitterContext context) {
            super(context);
        }

        @Override
        public void commitOutput() throws IOException {
            ++totalCommitCounter;
            super.commitOutput();
        }
    }

    private class DAGFinishEventHandler
    implements EventHandler<DAGAppMasterEventDAGFinished> {
        public int dagFinishEvents = 0;

        private DAGFinishEventHandler() {
        }

        public void handle(DAGAppMasterEventDAGFinished event) {
            ++this.dagFinishEvents;
        }
    }

    private class VertexEventDispatcher
    implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent event) {
            TezDAGID id = event.getVertexId().getDAGId();
            DAGImpl handler = TestDAGImpl.this.chooseDAG(id);
            Vertex vertex = handler.getVertex(event.getVertexId());
            ((EventHandler)vertex).handle((Event)event);
        }
    }

    private class TaskAttemptEventDisptacher2
    implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDisptacher2() {
        }

        public void handle(TaskAttemptEvent event) {
            TezDAGID id = event.getTaskAttemptID().getTaskID().getVertexID().getDAGId();
            DAGImpl handler = TestDAGImpl.this.chooseDAG(id);
            Vertex vertex = handler.getVertex(event.getTaskAttemptID().getTaskID().getVertexID());
            Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
            TaskAttempt ta = task.getAttempt(event.getTaskAttemptID());
            ((EventHandler)ta).handle((Event)event);
        }
    }

    private class TaskAttemptEventDispatcher
    implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent event) {
        }
    }

    private class TaskEventDispatcher
    implements EventHandler<TaskEvent> {
        private TaskEventDispatcher() {
        }

        public void handle(TaskEvent event) {
            TezDAGID id = event.getTaskID().getVertexID().getDAGId();
            DAGImpl handler = TestDAGImpl.this.chooseDAG(id);
            Vertex vertex = handler.getVertex(event.getTaskID().getVertexID());
            Task task = vertex.getTask(event.getTaskID());
            ((EventHandler)task).handle((Event)event);
        }
    }

    private class TaskEventHandler
    implements EventHandler<TaskEvent> {
        private TaskEventHandler() {
        }

        public void handle(TaskEvent event) {
        }
    }

    private class DagEventDispatcher
    implements EventHandler<DAGEvent> {
        private DagEventDispatcher() {
        }

        public void handle(DAGEvent event) {
            DAGImpl dag = TestDAGImpl.this.chooseDAG(event.getDAGId());
            dag.handle(event);
        }
    }
}

