/*
 * Decompiled with CFR 0.152.
 */
package org.apache.tez.dag.app;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.Utils;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.TaskAttemptEventInfo;
import org.apache.tez.dag.app.TaskCommunicatorContextImpl;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskCommunicatorWrapper;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.TezLocalTaskCommunicatorImpl;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@InterfaceAudience.Private
public class TaskCommunicatorManager
extends AbstractService
implements TaskCommunicatorManagerInterface {
    private static final Logger LOG = LoggerFactory.getLogger(TaskCommunicatorManager.class);
    private final AppContext context;
    private final TaskCommunicatorWrapper[] taskCommunicators;
    private final TaskCommunicatorContext[] taskCommunicatorContexts;
    protected final ServicePluginLifecycleAbstractService[] taskCommunicatorServiceWrappers;
    protected final TaskHeartbeatHandler taskHeartbeatHandler;
    protected final ContainerHeartbeatHandler containerHeartbeatHandler;
    private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
    private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts = new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
    private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers = new ConcurrentHashMap<ContainerId, ContainerInfo>();
    private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);

    @InterfaceAudience.Private
    @VisibleForTesting
    public TaskCommunicatorManager(TaskCommunicator taskCommunicator, AppContext appContext, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
        super(TaskCommunicatorManager.class.getName());
        this.context = appContext;
        this.taskHeartbeatHandler = thh;
        this.containerHeartbeatHandler = chh;
        this.taskCommunicators = new TaskCommunicatorWrapper[]{new TaskCommunicatorWrapper(taskCommunicator)};
        this.taskCommunicatorContexts = new TaskCommunicatorContext[]{taskCommunicator.getContext()};
        this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[]{new ServicePluginLifecycleAbstractService<TaskCommunicator>(taskCommunicator)};
    }

    public TaskCommunicatorManager(AppContext context, TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh, List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
        super(TaskCommunicatorManager.class.getName());
        this.context = context;
        this.taskHeartbeatHandler = thh;
        this.containerHeartbeatHandler = chh;
        Preconditions.checkArgument((taskCommunicatorDescriptors != null && !taskCommunicatorDescriptors.isEmpty() ? 1 : 0) != 0, (Object)"TaskCommunicators must be specified");
        this.taskCommunicators = new TaskCommunicatorWrapper[taskCommunicatorDescriptors.size()];
        this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
        this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
        for (int i = 0; i < taskCommunicatorDescriptors.size(); ++i) {
            UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
            this.taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
            this.taskCommunicators[i] = new TaskCommunicatorWrapper(this.createTaskCommunicator(taskCommunicatorDescriptors.get(i), i));
            this.taskCommunicatorServiceWrappers[i] = new ServicePluginLifecycleAbstractService<TaskCommunicator>(this.taskCommunicators[i].getTaskCommunicator());
        }
    }

    public void serviceStart() {
        for (int i = 0; i < this.taskCommunicators.length; ++i) {
            this.taskCommunicatorServiceWrappers[i].init(this.getConfig());
            this.taskCommunicatorServiceWrappers[i].start();
        }
    }

    public void serviceStop() {
        for (int i = 0; i < this.taskCommunicators.length; ++i) {
            this.taskCommunicatorServiceWrappers[i].stop();
        }
    }

    @VisibleForTesting
    TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor, int taskCommIndex) throws TezException {
        if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
            return this.createDefaultTaskCommunicator(this.taskCommunicatorContexts[taskCommIndex]);
        }
        if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezUberServicePluginName())) {
            return this.createUberTaskCommunicator(this.taskCommunicatorContexts[taskCommIndex]);
        }
        return this.createCustomTaskCommunicator(this.taskCommunicatorContexts[taskCommIndex], taskCommDescriptor);
    }

    @VisibleForTesting
    TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        LOG.info("Creating Default Task Communicator");
        return new TezTaskCommunicatorImpl(taskCommunicatorContext);
    }

    @VisibleForTesting
    TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
        LOG.info("Creating Default Local Task Communicator");
        return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
    }

    @VisibleForTesting
    TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext, NamedEntityDescriptor taskCommDescriptor) throws TezException {
        LOG.info("Creating TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(), (Object)taskCommDescriptor.getClassName());
        Class taskCommClazz = ReflectionUtils.getClazz((String)taskCommDescriptor.getClassName());
        try {
            Constructor ctor = taskCommClazz.getConstructor(TaskCommunicatorContext.class);
            return (TaskCommunicator)ctor.newInstance(taskCommunicatorContext);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new TezUncheckedException((Throwable)e);
        }
    }

    public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request) throws IOException, TezException {
        ContainerId containerId = ConverterUtils.toContainerId((String)request.getContainerIdentifier());
        if (LOG.isDebugEnabled()) {
            LOG.debug("Received heartbeat from container, request=" + request);
        }
        if (!this.registeredContainers.containsKey(containerId)) {
            LOG.warn("Received task heartbeat from unknown container with id: " + containerId + ", asking it to die");
            return this.RESPONSE_SHOULD_DIE;
        }
        this.pingContainerHeartbeatHandler(containerId);
        TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
        TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
        if (taskAttemptID != null) {
            ContainerId containerIdFromMap = (ContainerId)this.registeredAttempts.get(taskAttemptID);
            if (containerIdFromMap == null || !containerIdFromMap.equals((Object)containerId)) {
                LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
                return this.RESPONSE_SHOULD_DIE;
            }
            List<TezEvent> inEvents = request.getEvents();
            if (LOG.isDebugEnabled()) {
                LOG.debug("Ping from " + taskAttemptID.toString() + " events: " + (inEvents != null ? inEvents.size() : -1));
            }
            long currTime = this.context.getClock().getTime();
            ArrayList<TezEvent> taFinishedEvents = new ArrayList<TezEvent>();
            ArrayList<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
            ArrayList<TezEvent> eventsForVertex = new ArrayList<TezEvent>();
            TaskAttemptEventStatusUpdate taskAttemptEvent = null;
            boolean readErrorReported = false;
            for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
                tezEvent.setEventReceivedTime(currTime);
                EventType eventType = tezEvent.getEventType();
                if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
                    taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID, (TaskStatusUpdateEvent)tezEvent.getEvent());
                    continue;
                }
                if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT || eventType == EventType.TASK_ATTEMPT_FAILED_EVENT) {
                    taFinishedEvents.add(tezEvent);
                    continue;
                }
                if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
                    readErrorReported = true;
                }
                if (eventType == EventType.DATA_MOVEMENT_EVENT || eventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT || eventType == EventType.ROOT_INPUT_INITIALIZER_EVENT || eventType == EventType.VERTEX_MANAGER_EVENT) {
                    taGeneratedEvents.add(tezEvent);
                }
                eventsForVertex.add(tezEvent);
            }
            if (taskAttemptEvent != null) {
                taskAttemptEvent.setReadErrorReported(readErrorReported);
                this.sendEvent((Event<?>)taskAttemptEvent);
            }
            if (!taGeneratedEvents.isEmpty()) {
                this.sendEvent((Event<?>)new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
            }
            Preconditions.checkArgument((taFinishedEvents.size() <= 1 ? 1 : 0) != 0, (Object)"Multiple TaskAttemptFinishedEvent");
            block11: for (TezEvent e : taFinishedEvents) {
                EventMetaData sourceMeta = e.getSourceInfo();
                switch (e.getEventType()) {
                    case TASK_ATTEMPT_FAILED_EVENT: {
                        TaskAttemptTerminationCause errCause = null;
                        switch (sourceMeta.getEventGenerator()) {
                            case INPUT: {
                                errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
                                break;
                            }
                            case PROCESSOR: {
                                errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
                                break;
                            }
                            case OUTPUT: {
                                errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
                                break;
                            }
                            case SYSTEM: {
                                errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
                                break;
                            }
                            default: {
                                throw new TezUncheckedException("Unknown EventProducerConsumerType: " + sourceMeta.getEventGenerator());
                            }
                        }
                        TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent)e.getEvent();
                        this.sendEvent((Event<?>)new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_FAILED, "Error: " + taskFailedEvent.getDiagnostics(), errCause));
                        continue block11;
                    }
                    case TASK_ATTEMPT_COMPLETED_EVENT: {
                        this.sendEvent((Event<?>)new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
                        continue block11;
                    }
                }
                throw new TezUncheckedException("Unhandled tez event type: " + e.getEventType());
            }
            if (!eventsForVertex.isEmpty()) {
                TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
                this.sendEvent((Event<?>)new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
            }
            this.taskHeartbeatHandler.pinged(taskAttemptID);
            eventInfo = this.context.getCurrentDAG().getVertex(taskAttemptID.getTaskID().getVertexID()).getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(), request.getMaxEvents());
        }
        return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
    }

    public void taskAlive(TezTaskAttemptID taskAttemptId) {
        this.taskHeartbeatHandler.pinged(taskAttemptId);
    }

    public void containerAlive(ContainerId containerId) {
        this.pingContainerHeartbeatHandler(containerId);
    }

    public void taskStartedRemotely(TezTaskAttemptID taskAttemptID, ContainerId containerId) {
        this.sendEvent((Event<?>)new TaskAttemptEventStartedRemotely(taskAttemptID, containerId, null));
        this.pingContainerHeartbeatHandler(containerId);
    }

    public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, String diagnostics) {
        this.sendEvent((Event<?>)new TaskAttemptEventAttemptKilled(taskAttemptId, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason((TaskAttemptEndReason)taskAttemptEndReason)));
    }

    public void taskFailed(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason, String diagnostics) {
        this.sendEvent((Event<?>)new TaskAttemptEventAttemptFailed(taskAttemptId, TaskAttemptEventType.TA_FAILED, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason((TaskAttemptEndReason)taskAttemptEndReason)));
    }

    public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) {
        try {
            this.taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
        }
        catch (Exception e) {
            String msg = "Error in TaskCommunicator when handling vertex state update notification, communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, this.context) + ", vertexName=" + event.getVertexName() + ", vertexState=" + event.getVertexState();
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, msg, e));
        }
    }

    public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
        this.taskHeartbeatHandler.progressing(taskAttemptId);
        this.pingContainerHeartbeatHandler(taskAttemptId);
        DAG job = this.context.getCurrentDAG();
        Task task = job.getVertex(taskAttemptId.getTaskID().getVertexID()).getTask(taskAttemptId.getTaskID());
        return task.canCommit(taskAttemptId);
    }

    @Override
    public void dagComplete(DAG dag) {
        for (int i = 0; i < this.taskCommunicators.length; ++i) {
            try {
                ((TaskCommunicatorContextImpl)this.taskCommunicatorContexts[i]).dagCompleteStart(dag);
                this.taskCommunicators[i].dagComplete(dag.getID().getId());
                ((TaskCommunicatorContextImpl)this.taskCommunicatorContexts[i]).dagCompleteEnd();
                continue;
            }
            catch (Exception e) {
                String msg = "Error in TaskCommunicator when notifying for DAG completion, communicator=" + Utils.getTaskCommIdentifierString(i, this.context);
                LOG.error(msg, (Throwable)e);
                this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, msg, e));
            }
        }
    }

    @Override
    public void dagSubmitted() {
    }

    @Override
    public void registerRunningContainer(ContainerId containerId, int taskCommId) {
        ContainerInfo oldInfo;
        if (LOG.isDebugEnabled()) {
            LOG.debug("ContainerId: " + containerId + " registered with TaskAttemptListener");
        }
        if ((oldInfo = this.registeredContainers.put(containerId, NULL_CONTAINER_INFO)) != null) {
            throw new TezUncheckedException("Multiple registrations for containerId: " + containerId);
        }
        NodeId nodeId = this.context.getAllContainers().get(containerId).getContainer().getNodeId();
        try {
            this.taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(), nodeId.getPort());
        }
        catch (Exception e) {
            String msg = "Error in TaskCommunicator when registering running Container, communicator=" + Utils.getTaskCommIdentifierString(taskCommId, this.context) + ", containerId=" + containerId + ", nodeId=" + nodeId;
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, msg, e));
        }
    }

    @Override
    public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
        }
        ContainerInfo containerInfo = (ContainerInfo)this.registeredContainers.remove(containerId);
        if (containerInfo.taskAttemptId != null) {
            this.registeredAttempts.remove(containerInfo.taskAttemptId);
        }
        try {
            this.taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
        }
        catch (Exception e) {
            String msg = "Error in TaskCommunicator when unregistering Container, communicator=" + Utils.getTaskCommIdentifierString(taskCommId, this.context) + ", containerId=" + containerId;
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, msg, e));
        }
    }

    @Override
    public void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId) {
        ContainerInfo containerInfo = (ContainerInfo)this.registeredContainers.get(containerId);
        if (containerInfo == null) {
            throw new TezUncheckedException("Registering task attempt: " + amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
        }
        if (containerInfo.taskAttemptId != null) {
            throw new TezUncheckedException("Registering task attempt: " + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " with existing assignment to: " + containerInfo.taskAttemptId);
        }
        this.registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID()));
        ContainerId containerIdFromMap = this.registeredAttempts.put(amContainerTask.getTask().getTaskAttemptID(), containerId);
        if (containerIdFromMap != null) {
            throw new TezUncheckedException("Registering task attempt: " + amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId + " when already assigned to: " + containerIdFromMap);
        }
        try {
            this.taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(), amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(), amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
        }
        catch (Exception e) {
            String msg = "Error in TaskCommunicator when registering Task Attempt, communicator=" + Utils.getTaskCommIdentifierString(taskCommId, this.context) + ", containerId=" + containerId + ", taskId=" + amContainerTask.getTask().getTaskAttemptID();
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, msg, e));
        }
    }

    @Override
    public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
        ContainerId containerId = (ContainerId)this.registeredAttempts.remove(attemptId);
        if (containerId == null) {
            LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
            return;
        }
        ContainerInfo containerInfo = (ContainerInfo)this.registeredContainers.get(containerId);
        if (containerInfo == null) {
            LOG.warn("Unregister task attempt: " + attemptId + " from non-registered container: " + containerId);
            return;
        }
        this.registeredContainers.put(containerId, NULL_CONTAINER_INFO);
        try {
            this.taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
        }
        catch (Exception e) {
            String msg = "Error in TaskCommunicator when unregistering Task Attempt, communicator=" + Utils.getTaskCommIdentifierString(taskCommId, this.context) + ", containerId=" + containerId + ", taskId=" + attemptId;
            LOG.error(msg, (Throwable)e);
            this.sendEvent((Event<?>)new DAGAppMasterEventUserServiceFatalError(DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR, msg, e));
        }
    }

    @Override
    public TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex) {
        return this.taskCommunicators[taskCommIndex];
    }

    private void pingContainerHeartbeatHandler(ContainerId containerId) {
        this.containerHeartbeatHandler.pinged(containerId);
    }

    private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
        ContainerId containerId = (ContainerId)this.registeredAttempts.get(taskAttemptId);
        if (containerId != null) {
            this.containerHeartbeatHandler.pinged(containerId);
        } else {
            LOG.warn("Handling communication from attempt: " + taskAttemptId + ", ContainerId not known for this attempt");
        }
    }

    private void sendEvent(Event<?> event) {
        this.context.getEventHandler().handle(event);
    }

    private static final class ContainerInfo {
        TezTaskAttemptID taskAttemptId;

        ContainerInfo(TezTaskAttemptID taskAttemptId) {
            this.taskAttemptId = taskAttemptId;
        }
    }
}

