/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app.dag.impl;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.RecoveryParser;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.TestStateChangeNotifier;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventRecoverEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.CallableEventDispatcher;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.TestDAGImpl;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.TaskSchedulerManager;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.DAGInitializedEvent;
import org.apache.tez.dag.history.events.DAGStartedEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
import org.apache.tez.dag.history.events.VertexConfigurationDoneEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDAGRecovery {
    private static final Logger LOG = LoggerFactory.getLogger(TestDAGImpl.class);
    private static Configuration conf;
    private DrainDispatcher dispatcher;
    private ListeningExecutorService execService;
    private Credentials fsTokens;
    private AppContext appContext;
    private ACLManager aclManager;
    private ApplicationAttemptId appAttemptId;
    private TaskEventDispatcher taskEventDispatcher;
    private VertexEventDispatcher vertexEventDispatcher;
    private DagEventDispatcher dagEventDispatcher;
    private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
    private TaskHeartbeatHandler thh;
    private Clock clock = new SystemClock();
    private DAGFinishEventHandler dagFinishEventHandler;
    private DAGProtos.DAGPlan dagPlan;
    private DAGImpl dag;
    private TezDAGID dagId;
    private UserGroupInformation ugi;
    private MockHistoryEventHandler historyEventHandler;
    private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
    private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance((int)8192, (int)10));
    private RecoveryParser.DAGRecoveryData dagRecoveryData = (RecoveryParser.DAGRecoveryData)Mockito.mock(RecoveryParser.DAGRecoveryData.class);
    private TezVertexID v1Id;
    private TezTaskID t1v1Id;
    private TezTaskAttemptID ta1t1v1Id;
    private TezVertexID v2Id;
    private TezTaskID t1v2Id;
    private TezTaskAttemptID ta1t1v2Id;
    private Random rand = new Random();
    private long dagInitedTime = System.currentTimeMillis() + (long)this.rand.nextInt(100);
    private long dagStartedTime = this.dagInitedTime + (long)this.rand.nextInt(100);
    private long v1InitedTime = this.dagStartedTime + (long)this.rand.nextInt(100);
    private long v1StartedTime = this.v1InitedTime + (long)this.rand.nextInt(100);
    private int v1NumTask = 10;
    private long t1StartedTime = this.v1StartedTime + (long)this.rand.nextInt(100);
    private long t1FinishedTime = this.t1StartedTime + (long)this.rand.nextInt(100);
    private long ta1LaunchTime = this.t1StartedTime + (long)this.rand.nextInt(100);
    private long ta1FinishedTime = this.ta1LaunchTime + (long)this.rand.nextInt(100);

    @BeforeClass
    public static void beforeClass() {
        MockDNSToSwitchMapping.initializeMockRackResolver();
    }

    @Before
    public void setup() {
        conf = new Configuration();
        conf.setBoolean("tez.am.container.reuse.enabled", false);
        this.appAttemptId = ApplicationAttemptId.newInstance((ApplicationId)ApplicationId.newInstance((long)100L, (int)1), (int)1);
        this.dagId = TezDAGID.getInstance((ApplicationId)this.appAttemptId.getApplicationId(), (int)1);
        Assert.assertNotNull((Object)this.dagId);
        this.dagPlan = this.createDAGPlan();
        this.dispatcher = new DrainDispatcher();
        this.fsTokens = new Credentials();
        this.appContext = (AppContext)Mockito.mock(AppContext.class);
        this.execService = (ListeningExecutorService)Mockito.mock(ListeningExecutorService.class);
        this.thh = (TaskHeartbeatHandler)Mockito.mock(TaskHeartbeatHandler.class);
        final ListenableFuture mockFuture = (ListenableFuture)Mockito.mock(ListenableFuture.class);
        Mockito.when((Object)this.appContext.getHadoopShim()).thenReturn((Object)new DefaultHadoopShim());
        Mockito.when((Object)this.appContext.getApplicationID()).thenReturn((Object)this.appAttemptId.getApplicationId());
        Mockito.when((Object)this.appContext.getClock()).thenReturn((Object)new SystemClock());
        ((ListeningExecutorService)Mockito.doAnswer((Answer)new Answer(){

            public ListenableFuture<Void> answer(InvocationOnMock invocation) {
                Object[] args = invocation.getArguments();
                CallableEvent e = (CallableEvent)args[0];
                TestDAGRecovery.this.dispatcher.getEventHandler().handle((Event)e);
                return mockFuture;
            }
        }).when((Object)this.execService)).submit((Callable)Matchers.any());
        ((AppContext)Mockito.doReturn((Object)this.execService).when((Object)this.appContext)).getExecService();
        this.historyEventHandler = new MockHistoryEventHandler(this.appContext);
        this.aclManager = new ACLManager("amUser");
        ((AppContext)Mockito.doReturn((Object)conf).when((Object)this.appContext)).getAMConf();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId).when((Object)this.appContext)).getApplicationAttemptId();
        ((AppContext)Mockito.doReturn((Object)this.appAttemptId.getApplicationId()).when((Object)this.appContext)).getApplicationID();
        ((AppContext)Mockito.doReturn((Object)this.dagId).when((Object)this.appContext)).getCurrentDAGID();
        ((AppContext)Mockito.doReturn((Object)((Object)this.historyEventHandler)).when((Object)this.appContext)).getHistoryHandler();
        ((AppContext)Mockito.doReturn((Object)this.aclManager).when((Object)this.appContext)).getAMACLManager();
        ((AppContext)Mockito.doReturn((Object)this.dagRecoveryData).when((Object)this.appContext)).getDAGRecoveryData();
        this.dag = new DAGImpl(this.dagId, conf, this.dagPlan, this.dispatcher.getEventHandler(), this.taskCommunicatorManagerInterface, this.fsTokens, this.clock, "user", this.thh, this.appContext);
        this.dag.entityUpdateTracker = new TestStateChangeNotifier.StateChangeNotifierForTest((DAG)this.dag);
        ((AppContext)Mockito.doReturn((Object)this.dag).when((Object)this.appContext)).getCurrentDAG();
        this.ugi = (UserGroupInformation)Mockito.mock(UserGroupInformation.class);
        UserGroupInformation ugi = this.dag.getDagUGI();
        ((AppContext)Mockito.doReturn((Object)this.clusterInfo).when((Object)this.appContext)).getClusterInfo();
        TaskSchedulerManager mockTaskScheduler = (TaskSchedulerManager)Mockito.mock(TaskSchedulerManager.class);
        ((AppContext)Mockito.doReturn((Object)mockTaskScheduler).when((Object)this.appContext)).getTaskScheduler();
        this.v1Id = TezVertexID.getInstance((TezDAGID)this.dagId, (int)0);
        this.t1v1Id = TezTaskID.getInstance((TezVertexID)this.v1Id, (int)0);
        this.ta1t1v1Id = TezTaskAttemptID.getInstance((TezTaskID)this.t1v1Id, (int)0);
        this.v2Id = TezVertexID.getInstance((TezDAGID)this.dagId, (int)1);
        this.t1v2Id = TezTaskID.getInstance((TezVertexID)this.v2Id, (int)0);
        this.ta1t1v2Id = TezTaskAttemptID.getInstance((TezTaskID)this.t1v2Id, (int)0);
        this.dispatcher.register(CallableEventType.class, (EventHandler)new CallableEventDispatcher());
        this.taskEventDispatcher = new TaskEventDispatcher();
        this.dispatcher.register(TaskEventType.class, (EventHandler)this.taskEventDispatcher);
        this.taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
        this.dispatcher.register(TaskAttemptEventType.class, (EventHandler)this.taskAttemptEventDispatcher);
        this.vertexEventDispatcher = new VertexEventDispatcher();
        this.dispatcher.register(VertexEventType.class, (EventHandler)this.vertexEventDispatcher);
        this.dagEventDispatcher = new DagEventDispatcher();
        this.dispatcher.register(DAGEventType.class, (EventHandler)this.dagEventDispatcher);
        this.dagFinishEventHandler = new DAGFinishEventHandler();
        this.dispatcher.register(DAGAppMasterEventType.class, (EventHandler)this.dagFinishEventHandler);
        this.dispatcher.register(AMSchedulerEventType.class, (EventHandler)new AMSchedulerEventDispatcher());
        this.dispatcher.init(conf);
        this.dispatcher.start();
        ((AppContext)Mockito.doReturn((Object)this.dispatcher.getEventHandler()).when((Object)this.appContext)).getEventHandler();
        LogManager.getRootLogger().setLevel(Level.DEBUG);
    }

    private DAGProtos.DAGPlan createDAGPlan() {
        DAGProtos.DAGPlan dag = DAGProtos.DAGPlan.newBuilder().setName("testverteximpl").addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex1").setType(DAGProtos.PlanVertexType.NORMAL).addInputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setName("input1").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(MockInputInitializer.class.getName()).build())).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host1").addRack("rack1").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(-1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x1.y1").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output1").build()).setName("output1").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addOutEdgeId("e1").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex2").setType(DAGProtos.PlanVertexType.NORMAL).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host2").addRack("rack2").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("").setTaskModule("x2.y2").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output2").build()).setName("output2").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(RecoveryNotSupportedOutputCommitter.class.getName()))).addOutEdgeId("e2").build()).addVertex(DAGProtos.VertexPlan.newBuilder().setName("vertex3").setType(DAGProtos.PlanVertexType.NORMAL).setProcessorDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("x3.y3")).addTaskLocationHint(DAGProtos.PlanTaskLocationHint.newBuilder().addHost("host3").addRack("rack3").build()).setTaskConfig(DAGProtos.PlanTaskConfiguration.newBuilder().setNumTasks(1).setVirtualCores(4).setMemoryMb(1024).setJavaOpts("foo").setTaskModule("x3.y3").build()).addOutputs(DAGProtos.RootInputLeafOutputProto.newBuilder().setIODescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("output3").build()).setName("output3").setControllerDescriptor(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName(TestVertexImpl.CountingOutputCommitter.class.getName()))).addInEdgeId("e1").addInEdgeId("e2").build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i2")).setInputVertexName("vertex1").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o1")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).addEdge(DAGProtos.EdgePlan.newBuilder().setEdgeDestination(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("i3")).setInputVertexName("vertex2").setEdgeSource(DAGProtos.TezEntityDescriptorProto.newBuilder().setClassName("o2")).setOutputVertexName("vertex3").setDataMovementType(DAGProtos.PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2").setDataSourceType(DAGProtos.PlanEdgeDataSourceType.PERSISTED).setSchedulingType(DAGProtos.PlanEdgeSchedulingType.SEQUENTIAL).build()).build();
        return dag;
    }

    @After
    public void teardown() {
        this.dispatcher.await();
        this.dispatcher.stop();
        this.execService.shutdownNow();
        this.dagPlan = null;
        if (this.dag != null) {
            this.dag.entityUpdateTracker.stop();
        }
        this.dag = null;
    }

    @Test(timeout=5000L)
    public void testDAGRecoverFromDesiredSucceeded() {
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, DAGState.SUCCEEDED, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.SUCCEEDED, (Object)this.dag.getState());
        Assert.assertEquals((long)3L, (long)this.dag.getVertices().size());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)this.dag.getVertex("vertex1").getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)this.dag.getVertex("vertex2").getState());
        Assert.assertEquals((Object)VertexState.SUCCEEDED, (Object)this.dag.getVertex("vertex3").getState());
    }

    @Test(timeout=5000L)
    public void testDAGRecoverFromDesiredFailed() {
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, DAGState.FAILED, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.FAILED, (Object)this.dag.getState());
        Assert.assertEquals((long)3L, (long)this.dag.getVertices().size());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)this.dag.getVertex("vertex1").getState());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)this.dag.getVertex("vertex2").getState());
        Assert.assertEquals((Object)VertexState.FAILED, (Object)this.dag.getVertex("vertex3").getState());
    }

    @Test(timeout=5000L)
    public void testDAGRecoverFromDesiredKilled() {
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, DAGState.KILLED, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.KILLED, (Object)this.dag.getState());
        Assert.assertEquals((long)3L, (long)this.dag.getVertices().size());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)this.dag.getVertex("vertex1").getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)this.dag.getVertex("vertex2").getState());
        Assert.assertEquals((Object)VertexState.KILLED, (Object)this.dag.getVertex("vertex3").getState());
    }

    @Test(timeout=5000L)
    public void testDAGRecoverFromDesiredError() {
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, DAGState.ERROR, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.ERROR, (Object)this.dag.getState());
        Assert.assertEquals((long)3L, (long)this.dag.getVertices().size());
        Assert.assertEquals((Object)VertexState.ERROR, (Object)this.dag.getVertex("vertex1").getState());
        Assert.assertEquals((Object)VertexState.ERROR, (Object)this.dag.getVertex("vertex2").getState());
        Assert.assertEquals((Object)VertexState.ERROR, (Object)this.dag.getVertex("vertex3").getState());
    }

    @Test(timeout=5000L)
    public void testDAGRecoverFromNew() {
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
    }

    @Test(timeout=5000L)
    public void testDAGRecoverFromInited() {
        DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(this.dagId, this.dagInitedTime, "user", "dagName", null);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagInitedEvent).when((Object)this.dagRecoveryData)).getDAGInitializedEvent();
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((long)this.dagInitedTime, (long)this.dag.initTime);
    }

    @Test(timeout=5000L)
    public void testDAGRecoverFromStarted() {
        DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(this.dagId, this.dagInitedTime, "user", "dagName", null);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagInitedEvent).when((Object)this.dagRecoveryData)).getDAGInitializedEvent();
        DAGStartedEvent dagStartedEvent = new DAGStartedEvent(this.dagId, this.dagStartedTime, "user", "dagName");
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagStartedEvent).when((Object)this.dagRecoveryData)).getDAGStartedEvent();
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((long)this.dagInitedTime, (long)this.dag.initTime);
        Assert.assertEquals((long)this.dagStartedTime, (long)this.dag.startTime);
    }

    private void initMockDAGRecoveryDataForVertex() {
        DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(this.dagId, this.dagInitedTime, "user", "dagName", null);
        DAGStartedEvent dagStartedEvent = new DAGStartedEvent(this.dagId, this.dagStartedTime, "user", "dagName");
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagInitedEvent).when((Object)this.dagRecoveryData)).getDAGInitializedEvent();
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagStartedEvent).when((Object)this.dagRecoveryData)).getDAGStartedEvent();
    }

    @Test(timeout=5000L)
    public void testVertexRecoverFromNew() {
        this.initMockDAGRecoveryDataForVertex();
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)v3.getState());
    }

    @Test(timeout=5000L)
    public void testVertexRecoverFromInited() {
        this.initMockDAGRecoveryDataForVertex();
        ArrayList inputGeneratedTezEvents = new ArrayList();
        VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(this.v1Id, "vertex1", 0L, this.v1InitedTime, this.v1NumTask, "", null, inputGeneratedTezEvents, null);
        RecoveryParser.VertexRecoveryData vertexRecoveryData = new RecoveryParser.VertexRecoveryData(v1InitedEvent, null, null, null, null, false);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)vertexRecoveryData).when((Object)this.dagRecoveryData)).getVertexRecoveryData(this.v1Id);
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)VertexState.INITIALIZING, (Object)v1.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.INITED, (Object)v3.getState());
    }

    @Test
    public void testVertexRecoverFromInitedAndReconfigureDone() {
        this.initMockDAGRecoveryDataForVertex();
        ArrayList inputGeneratedTezEvents = new ArrayList();
        VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(this.v1Id, "vertex1", 0L, this.v1InitedTime, this.v1NumTask, "", null, inputGeneratedTezEvents, null);
        VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(this.v1Id, 0L, this.v1NumTask, null, null, null, true);
        RecoveryParser.VertexRecoveryData vertexRecoveryData = new RecoveryParser.VertexRecoveryData(v1InitedEvent, v1ReconfigureDoneEvent, null, null, new HashMap(), false);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)vertexRecoveryData).when((Object)this.dagRecoveryData)).getVertexRecoveryData(this.v1Id);
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        try {
            Thread.sleep(1000L);
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((long)this.v1InitedTime, (long)v1.initedTime);
        Assert.assertEquals((long)this.v1NumTask, (long)v1.getTotalTasks());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
    }

    @Test(timeout=5000L)
    public void testVertexRecoverFromStart() {
        this.initMockDAGRecoveryDataForVertex();
        ArrayList inputGeneratedTezEvents = new ArrayList();
        VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(this.v1Id, "vertex1", 0L, this.v1InitedTime, this.v1NumTask, "", null, inputGeneratedTezEvents, null);
        VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(this.v1Id, 0L, this.v1NumTask, null, null, null, true);
        VertexStartedEvent v1StartedEvent = new VertexStartedEvent(this.v1Id, 0L, this.v1StartedTime);
        RecoveryParser.VertexRecoveryData vertexRecoveryData = new RecoveryParser.VertexRecoveryData(v1InitedEvent, v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap(), false);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)vertexRecoveryData).when((Object)this.dagRecoveryData)).getVertexRecoveryData(this.v1Id);
        DAGEventRecoverEvent recoveryEvent = new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData);
        this.dag.handle((DAGEvent)recoveryEvent);
        this.dispatcher.await();
        VertexImpl v1 = (VertexImpl)this.dag.getVertex("vertex1");
        VertexImpl v2 = (VertexImpl)this.dag.getVertex("vertex2");
        VertexImpl v3 = (VertexImpl)this.dag.getVertex("vertex3");
        Assert.assertEquals((Object)DAGState.RUNNING, (Object)this.dag.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v1.getState());
        Assert.assertEquals((long)this.v1InitedTime, (long)v1.initedTime);
        Assert.assertEquals((long)this.v1StartedTime, (long)v1.startedTime);
        Assert.assertEquals((long)this.v1NumTask, (long)v1.getTotalTasks());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v2.getState());
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)v3.getState());
    }

    private void initMockDAGRecoveryDataForTask() {
        ArrayList inputGeneratedTezEvents = new ArrayList();
        VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(this.v1Id, "vertex1", 0L, this.v1InitedTime, this.v1NumTask, "", null, inputGeneratedTezEvents, null);
        HashMap rootInputSpecs = new HashMap();
        VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(this.v1Id, 0L, this.v1NumTask, null, null, rootInputSpecs, true);
        VertexStartedEvent v1StartedEvent = new VertexStartedEvent(this.v1Id, 0L, this.v1StartedTime);
        RecoveryParser.VertexRecoveryData v1RecoveryData = new RecoveryParser.VertexRecoveryData(v1InitedEvent, v1ReconfigureDoneEvent, v1StartedEvent, null, new HashMap(), false);
        DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(this.dagId, this.dagInitedTime, "user", "dagName", null);
        DAGStartedEvent dagStartedEvent = new DAGStartedEvent(this.dagId, this.dagStartedTime, "user", "dagName");
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)v1RecoveryData).when((Object)this.dagRecoveryData)).getVertexRecoveryData(this.v1Id);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagInitedEvent).when((Object)this.dagRecoveryData)).getDAGInitializedEvent();
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagStartedEvent).when((Object)this.dagRecoveryData)).getDAGStartedEvent();
    }

    @Test(timeout=5000L)
    public void testTaskRecoverFromKilled() {
        this.initMockDAGRecoveryDataForTask();
        TaskFinishedEvent taskFinishedEvent = new TaskFinishedEvent(this.t1v1Id, "v1", 0L, 0L, null, TaskState.KILLED, "", null, 4);
        RecoveryParser.TaskRecoveryData taskRecoveryData = new RecoveryParser.TaskRecoveryData(null, taskFinishedEvent, null);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taskRecoveryData).when((Object)this.dagRecoveryData)).getTaskRecoveryData(this.t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex(this.v1Id);
        TaskImpl task = (TaskImpl)vertex1.getTask(this.t1v1Id);
        Assert.assertEquals((Object)TaskStateInternal.KILLED, (Object)task.getInternalState());
        Assert.assertEquals((long)1L, (long)vertex1.getCompletedTasks());
    }

    @Test(timeout=5000L)
    public void testTaskRecoverFromStarted() {
        this.initMockDAGRecoveryDataForTask();
        TaskStartedEvent taskStartedEvent = new TaskStartedEvent(this.t1v1Id, "v1", 0L, 0L);
        RecoveryParser.TaskRecoveryData taskRecoveryData = new RecoveryParser.TaskRecoveryData(taskStartedEvent, null, null);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taskRecoveryData).when((Object)this.dagRecoveryData)).getTaskRecoveryData(this.t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex(this.v1Id);
        TaskImpl task = (TaskImpl)vertex1.getTask(this.t1v1Id);
        Assert.assertEquals((Object)TaskStateInternal.SCHEDULED, (Object)task.getInternalState());
    }

    @Test(timeout=5000L)
    public void testTaskRecoverFromSucceeded() {
        this.initMockDAGRecoveryDataForTask();
        TaskStartedEvent taskStartedEvent = new TaskStartedEvent(this.t1v1Id, "v1", 0L, 0L);
        TaskFinishedEvent taskFinishedEvent = new TaskFinishedEvent(this.t1v1Id, "v1", 0L, 0L, null, TaskState.SUCCEEDED, "", null, 4);
        TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(this.ta1t1v1Id, "v1", 0L, (ContainerId)Mockito.mock(ContainerId.class), (NodeId)Mockito.mock(NodeId.class), "", "", "");
        ArrayList<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
        EventMetaData metadata = new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex1", "vertex3", this.ta1t1v2Id);
        taGeneratedEvents.add(new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((ByteBuffer)ByteBuffer.wrap(new byte[0])), metadata));
        TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(this.ta1t1v1Id, "v1", 0L, 0L, TaskAttemptState.SUCCEEDED, null, null, "", null, null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
        HashMap<TezTaskAttemptID, RecoveryParser.TaskAttemptRecoveryData> taRecoveryDataMap = new HashMap<TezTaskAttemptID, RecoveryParser.TaskAttemptRecoveryData>();
        taRecoveryDataMap.put(this.ta1t1v1Id, taRecoveryData);
        RecoveryParser.TaskRecoveryData taskRecoveryData = new RecoveryParser.TaskRecoveryData(taskStartedEvent, taskFinishedEvent, taRecoveryDataMap);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taskRecoveryData).when((Object)this.dagRecoveryData)).getTaskRecoveryData(this.t1v1Id);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        VertexImpl vertex1 = (VertexImpl)this.dag.getVertex(this.v1Id);
        TaskImpl task = (TaskImpl)vertex1.getTask(this.t1v1Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v1Id);
        Assert.assertEquals((Object)VertexState.RUNNING, (Object)vertex1.getState());
        Assert.assertEquals((long)1L, (long)vertex1.getCompletedTasks());
        Assert.assertEquals((Object)TaskStateInternal.SUCCEEDED, (Object)task.getInternalState());
        Assert.assertEquals((Object)TaskAttemptStateInternal.SUCCEEDED, (Object)taskAttempt.getInternalState());
    }

    private void initMockDAGRecoveryDataForTaskAttempt() {
        TaskStartedEvent t1StartedEvent = new TaskStartedEvent(this.t1v1Id, "vertex1", 0L, this.t1StartedTime);
        RecoveryParser.TaskRecoveryData taskRecoveryData = new RecoveryParser.TaskRecoveryData(t1StartedEvent, null, null);
        HashMap<TezTaskID, RecoveryParser.TaskRecoveryData> taskRecoveryDataMap = new HashMap<TezTaskID, RecoveryParser.TaskRecoveryData>();
        taskRecoveryDataMap.put(this.t1v1Id, taskRecoveryData);
        ArrayList inputGeneratedTezEvents = new ArrayList();
        VertexInitializedEvent v1InitedEvent = new VertexInitializedEvent(this.v1Id, "vertex1", 0L, this.v1InitedTime, this.v1NumTask, "", null, inputGeneratedTezEvents, null);
        HashMap rootInputSpecs = new HashMap();
        VertexConfigurationDoneEvent v1ReconfigureDoneEvent = new VertexConfigurationDoneEvent(this.v1Id, 0L, this.v1NumTask, null, null, rootInputSpecs, true);
        VertexStartedEvent v1StartedEvent = new VertexStartedEvent(this.v1Id, 0L, this.v1StartedTime);
        RecoveryParser.VertexRecoveryData v1RecoveryData = new RecoveryParser.VertexRecoveryData(v1InitedEvent, v1ReconfigureDoneEvent, v1StartedEvent, null, taskRecoveryDataMap, false);
        DAGInitializedEvent dagInitedEvent = new DAGInitializedEvent(this.dagId, this.dagInitedTime, "user", "dagName", null);
        DAGStartedEvent dagStartedEvent = new DAGStartedEvent(this.dagId, this.dagStartedTime, "user", "dagName");
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)v1RecoveryData).when((Object)this.dagRecoveryData)).getVertexRecoveryData(this.v1Id);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagInitedEvent).when((Object)this.dagRecoveryData)).getDAGInitializedEvent();
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)dagStartedEvent).when((Object)this.dagRecoveryData)).getDAGStartedEvent();
    }

    @Test(timeout=5000L)
    public void testTARecoverFromNewToFailed() {
        this.initMockDAGRecoveryDataForTaskAttempt();
        TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, this.ta1FinishedTime, TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, "", null, null, null, 0L, null, 0L, null, null, null, null, null);
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(null, taFinishedEvent);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        TaskImpl task = (TaskImpl)this.dag.getVertex(this.v1Id).getTask(this.t1v1Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v1Id);
        Assert.assertEquals((Object)TaskAttemptStateInternal.FAILED, (Object)taskAttempt.getInternalState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED, (Object)taskAttempt.getTerminationCause());
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
        Assert.assertEquals((long)1L, (long)task.failedAttempts);
        Assert.assertEquals((long)2L, (long)task.getAttempts().size());
        Assert.assertEquals((long)this.ta1FinishedTime, (long)taskAttempt.getFinishTime());
    }

    @Test(timeout=5000L)
    public void testTARecoverFromNewToKilled() {
        this.initMockDAGRecoveryDataForTaskAttempt();
        TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, this.ta1FinishedTime, TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, null, null, 0L, null, 0L, null, null, null, null, null);
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(null, taFinishedEvent);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        TaskImpl task = (TaskImpl)this.dag.getVertex(this.v1Id).getTask(this.t1v1Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v1Id);
        Assert.assertEquals((Object)TaskAttemptStateInternal.KILLED, (Object)taskAttempt.getInternalState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, (Object)taskAttempt.getTerminationCause());
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
        Assert.assertEquals((long)0L, (long)task.failedAttempts);
        Assert.assertEquals((long)this.ta1FinishedTime, (long)taskAttempt.getFinishTime());
    }

    @Test(timeout=5000L)
    public void testTARecoverFromRunning() {
        this.initMockDAGRecoveryDataForTaskAttempt();
        TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, (ContainerId)Mockito.mock(ContainerId.class), (NodeId)Mockito.mock(NodeId.class), "", "", "");
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(taStartedEvent, null);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        TaskImpl task = (TaskImpl)this.dag.getVertex(this.v1Id).getTask(this.t1v1Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v1Id);
        Assert.assertEquals((Object)TaskAttemptStateInternal.KILLED, (Object)taskAttempt.getInternalState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, (Object)taskAttempt.getTerminationCause());
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_STARTED);
        this.historyEventHandler.verifyHistoryEvent(1, HistoryEventType.TASK_ATTEMPT_FINISHED);
        Assert.assertEquals((long)this.ta1LaunchTime, (long)taskAttempt.getLaunchTime());
    }

    @Test(timeout=5000L)
    public void testTARecoverFromSucceeded() {
        this.initMockDAGRecoveryDataForTaskAttempt();
        TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, (ContainerId)Mockito.mock(ContainerId.class), (NodeId)Mockito.mock(NodeId.class), "", "", "");
        ArrayList<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
        EventMetaData sourceInfo = new EventMetaData(EventMetaData.EventProducerConsumerType.INPUT, "vertex1", "vertex3", this.ta1t1v1Id);
        taGeneratedEvents.add(new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((ByteBuffer)ByteBuffer.wrap(new byte[0])), sourceInfo));
        TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, this.ta1FinishedTime, TaskAttemptState.SUCCEEDED, null, null, "", null, null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        TaskImpl task = (TaskImpl)this.dag.getVertex(this.v1Id).getTask(this.t1v1Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v1Id);
        Assert.assertEquals((Object)TaskAttemptStateInternal.SUCCEEDED, (Object)taskAttempt.getInternalState());
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
        Assert.assertEquals((Object)TaskStateInternal.SUCCEEDED, (Object)task.getInternalState());
        Assert.assertEquals((long)this.ta1LaunchTime, (long)taskAttempt.getLaunchTime());
        Assert.assertEquals((long)this.ta1FinishedTime, (long)taskAttempt.getFinishTime());
    }

    @Test
    public void testTARecoverFromSucceeded_OutputCommitterRecoveryNotSupported() throws Exception {
        this.initMockDAGRecoveryDataForTaskAttempt();
        TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(this.ta1t1v2Id, "vertex2", this.ta1LaunchTime, (ContainerId)Mockito.mock(ContainerId.class), (NodeId)Mockito.mock(NodeId.class), "", "", "");
        ArrayList<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
        EventMetaData metadata = new EventMetaData(EventMetaData.EventProducerConsumerType.OUTPUT, "vertex2", "vertex3", this.ta1t1v2Id);
        taGeneratedEvents.add(new TezEvent((org.apache.tez.runtime.api.Event)DataMovementEvent.create((ByteBuffer)ByteBuffer.wrap(new byte[0])), metadata));
        TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(this.ta1t1v2Id, "vertex2", this.ta1LaunchTime, this.ta1FinishedTime, TaskAttemptState.SUCCEEDED, null, null, "", null, null, taGeneratedEvents, 0L, null, 0L, null, null, null, null, null);
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v2Id);
        HashMap<TezTaskAttemptID, RecoveryParser.TaskAttemptRecoveryData> taRecoveryDataMap = new HashMap<TezTaskAttemptID, RecoveryParser.TaskAttemptRecoveryData>();
        taRecoveryDataMap.put(this.ta1t1v2Id, taRecoveryData);
        TaskStartedEvent t1StartedEvent = new TaskStartedEvent(this.t1v2Id, "vertex2", 0L, this.t1StartedTime);
        RecoveryParser.TaskRecoveryData taskRecoveryData = new RecoveryParser.TaskRecoveryData(t1StartedEvent, null, taRecoveryDataMap);
        HashMap<TezTaskID, RecoveryParser.TaskRecoveryData> taskRecoveryDataMap = new HashMap<TezTaskID, RecoveryParser.TaskRecoveryData>();
        taskRecoveryDataMap.put(this.t1v2Id, taskRecoveryData);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taskRecoveryData).when((Object)this.dagRecoveryData)).getTaskRecoveryData(this.t1v2Id);
        VertexInitializedEvent v2InitedEvent = new VertexInitializedEvent(this.v2Id, "vertex2", 0L, this.v1InitedTime, this.v1NumTask, "", null, null, null);
        VertexConfigurationDoneEvent v2ReconfigureDoneEvent = new VertexConfigurationDoneEvent(this.v2Id, 0L, this.v1NumTask, null, null, null, false);
        VertexStartedEvent v2StartedEvent = new VertexStartedEvent(this.v2Id, 0L, this.v1StartedTime);
        RecoveryParser.VertexRecoveryData v2RecoveryData = new RecoveryParser.VertexRecoveryData(v2InitedEvent, v2ReconfigureDoneEvent, v2StartedEvent, null, taskRecoveryDataMap, false);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)v2RecoveryData).when((Object)this.dagRecoveryData)).getVertexRecoveryData(this.v2Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        TaskImpl task = (TaskImpl)this.dag.getVertex(this.v2Id).getTask(this.t1v2Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v2Id);
        Assert.assertEquals((Object)TaskAttemptStateInternal.KILLED, (Object)taskAttempt.getInternalState());
        this.historyEventHandler.verifyHistoryEvent(1, HistoryEventType.TASK_ATTEMPT_FINISHED);
        Assert.assertEquals((Object)TaskStateInternal.RUNNING, (Object)task.getInternalState());
        Assert.assertEquals((long)2L, (long)task.getAttempts().size());
        Assert.assertEquals((long)this.ta1LaunchTime, (long)taskAttempt.getLaunchTime());
        Assert.assertEquals((long)this.ta1FinishedTime, (long)taskAttempt.getFinishTime());
    }

    @Test(timeout=5000L)
    public void testTARecoverFromFailed() {
        this.initMockDAGRecoveryDataForTaskAttempt();
        TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, (ContainerId)Mockito.mock(ContainerId.class), (NodeId)Mockito.mock(NodeId.class), "", "", "");
        TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, this.ta1FinishedTime, TaskAttemptState.FAILED, TaskFailureType.NON_FATAL, TaskAttemptTerminationCause.INPUT_READ_ERROR, "", null, null, null, 0L, null, 0L, null, null, null, null, null);
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        TaskImpl task = (TaskImpl)this.dag.getVertex(this.v1Id).getTask(this.t1v1Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v1Id);
        Assert.assertEquals((Object)TaskAttemptStateInternal.FAILED, (Object)taskAttempt.getInternalState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.INPUT_READ_ERROR, (Object)taskAttempt.getTerminationCause());
        Assert.assertEquals((Object)TaskStateInternal.SCHEDULED, (Object)task.getInternalState());
        Assert.assertEquals((long)2L, (long)task.getAttempts().size());
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
        Assert.assertEquals((long)this.ta1LaunchTime, (long)taskAttempt.getLaunchTime());
        Assert.assertEquals((long)this.ta1FinishedTime, (long)taskAttempt.getFinishTime());
    }

    @Test(timeout=5000L)
    public void testTARecoverFromKilled() {
        this.initMockDAGRecoveryDataForTaskAttempt();
        TaskAttemptStartedEvent taStartedEvent = new TaskAttemptStartedEvent(this.ta1t1v1Id, "v1", this.ta1LaunchTime, (ContainerId)Mockito.mock(ContainerId.class), (NodeId)Mockito.mock(NodeId.class), "", "", "");
        TaskAttemptFinishedEvent taFinishedEvent = new TaskAttemptFinishedEvent(this.ta1t1v1Id, "v1", this.ta1FinishedTime, this.ta1FinishedTime, TaskAttemptState.KILLED, null, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, "", null, null, null, 0L, null, 0L, null, null, null, null, null);
        RecoveryParser.TaskAttemptRecoveryData taRecoveryData = new RecoveryParser.TaskAttemptRecoveryData(taStartedEvent, taFinishedEvent);
        ((RecoveryParser.DAGRecoveryData)Mockito.doReturn((Object)taRecoveryData).when((Object)this.dagRecoveryData)).getTaskAttemptRecoveryData(this.ta1t1v1Id);
        this.dag.handle((DAGEvent)new DAGEventRecoverEvent(this.dagId, this.dagRecoveryData));
        this.dispatcher.await();
        TaskImpl task = (TaskImpl)this.dag.getVertex(this.v1Id).getTask(this.t1v1Id);
        TaskAttemptImpl taskAttempt = (TaskAttemptImpl)task.getAttempt(this.ta1t1v1Id);
        Assert.assertEquals((Object)TaskAttemptStateInternal.KILLED, (Object)taskAttempt.getInternalState());
        Assert.assertEquals((Object)TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, (Object)taskAttempt.getTerminationCause());
        this.historyEventHandler.verifyHistoryEvent(0, HistoryEventType.TASK_ATTEMPT_FINISHED);
        Assert.assertEquals((long)this.ta1LaunchTime, (long)taskAttempt.getLaunchTime());
        Assert.assertEquals((long)this.ta1FinishedTime, (long)taskAttempt.getFinishTime());
    }

    public static class RecoveryNotSupportedOutputCommitter
    extends OutputCommitter {
        public RecoveryNotSupportedOutputCommitter(OutputCommitterContext committerContext) {
            super(committerContext);
        }

        public void initialize() throws Exception {
        }

        public void setupOutput() throws Exception {
        }

        public void commitOutput() throws Exception {
        }

        public void abortOutput(VertexStatus.State finalState) throws Exception {
        }

        public boolean isTaskRecoverySupported() {
            return false;
        }
    }

    public static class MockInputInitializer
    extends InputInitializer {
        public MockInputInitializer(InputInitializerContext initializerContext) {
            super(initializerContext);
        }

        public List<org.apache.tez.runtime.api.Event> initialize() throws Exception {
            while (true) {
                Thread.sleep(1000L);
            }
        }

        public void handleInputInitializerEvent(List<InputInitializerEvent> events) throws Exception {
        }
    }

    private static class MockHistoryEventHandler
    extends HistoryEventHandler {
        private List<HistoryEvent> historyEvents = new ArrayList<HistoryEvent>();

        public MockHistoryEventHandler(AppContext context) {
            super(context);
        }

        public void handleCriticalEvent(DAGHistoryEvent event) throws IOException {
            this.historyEvents.add(event.getHistoryEvent());
        }

        public List<HistoryEvent> getHistoryEvents() {
            return this.historyEvents;
        }

        public void verifyHistoryEvent(int expectedTimes, HistoryEventType eventType) {
            int actualCount = 0;
            for (HistoryEvent event : this.historyEvents) {
                if (event.getEventType() != eventType) continue;
                ++actualCount;
            }
            Assert.assertEquals((long)expectedTimes, (long)actualCount);
        }
    }

    private class AMSchedulerEventDispatcher
    implements EventHandler<AMSchedulerEvent> {
        private AMSchedulerEventDispatcher() {
        }

        public void handle(AMSchedulerEvent event) {
        }
    }

    private class DAGFinishEventHandler
    implements EventHandler<DAGAppMasterEventDAGFinished> {
        public int dagFinishEvents = 0;

        private DAGFinishEventHandler() {
        }

        public void handle(DAGAppMasterEventDAGFinished event) {
            ++this.dagFinishEvents;
        }
    }

    private class VertexEventDispatcher
    implements EventHandler<VertexEvent> {
        private VertexEventDispatcher() {
        }

        public void handle(VertexEvent event) {
            VertexImpl vertex = (VertexImpl)TestDAGRecovery.this.dag.getVertex(event.getVertexId());
            vertex.handle(event);
        }
    }

    private class TaskAttemptEventDispatcher
    implements EventHandler<TaskAttemptEvent> {
        private TaskAttemptEventDispatcher() {
        }

        public void handle(TaskAttemptEvent event) {
            Vertex vertex = TestDAGRecovery.this.dag.getVertex(event.getTaskAttemptID().getTaskID().getVertexID());
            Task task = vertex.getTask(event.getTaskAttemptID().getTaskID());
            TaskAttempt ta = task.getAttempt(event.getTaskAttemptID());
            ((EventHandler)ta).handle((Event)event);
        }
    }

    private class TaskEventDispatcher
    implements EventHandler<TaskEvent> {
        private TaskEventDispatcher() {
        }

        public void handle(TaskEvent event) {
            TaskImpl task = (TaskImpl)TestDAGRecovery.this.dag.getVertex(event.getTaskID().getVertexID()).getTask(event.getTaskID());
            task.handle(event);
        }
    }

    private class DagEventDispatcher
    implements EventHandler<DAGEvent> {
        private DagEventDispatcher() {
        }

        public void handle(DAGEvent event) {
            TestDAGRecovery.this.dag.handle(event);
        }
    }
}

