package org.apache.nemo.runtime.master.resource;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.concurrent.NotThreadSafe;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.nemo.common.ir.executionproperty.ResourceSpecification;
import org.apache.nemo.common.ir.vertex.executionproperty.ResourceSlotProperty;
import org.apache.nemo.runtime.common.RuntimeIdManager;
import org.apache.nemo.runtime.common.comm.ControlMessage;
import org.apache.nemo.runtime.common.message.MessageSender;
import org.apache.nemo.runtime.common.plan.Task;
import org.apache.reef.driver.context.ActiveContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@NotThreadSafe
/* loaded from: input_file:org/apache/nemo/runtime/master/resource/DefaultExecutorRepresenter.class */
public final class DefaultExecutorRepresenter implements ExecutorRepresenter {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultExecutorRepresenter.class.getName());
    private final String executorId;
    private final ResourceSpecification resourceSpecification;
    private final Map<String, Task> runningComplyingTasks = new HashMap();
    private final Map<String, Task> runningNonComplyingTasks = new HashMap();
    private final Map<Task, Integer> runningTaskToAttempt = new HashMap();
    private final Set<Task> completeTasks = new HashSet();
    private final Set<Task> failedTasks = new HashSet();
    private final MessageSender<ControlMessage.Message> messageSender;
    private final ActiveContext activeContext;
    private final ExecutorService serializationExecutorService;
    private final String nodeName;

    public DefaultExecutorRepresenter(String str, ResourceSpecification resourceSpecification, MessageSender<ControlMessage.Message> messageSender, ActiveContext activeContext, ExecutorService executorService, String str2) {
        this.executorId = str;
        this.resourceSpecification = resourceSpecification;
        this.messageSender = messageSender;
        this.activeContext = activeContext;
        this.serializationExecutorService = executorService;
        this.nodeName = str2;
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public Set<String> onExecutorFailed() {
        this.failedTasks.addAll(this.runningComplyingTasks.values());
        this.failedTasks.addAll(this.runningNonComplyingTasks.values());
        Set<String> set = (Set) Stream.concat(this.runningComplyingTasks.keySet().stream(), this.runningNonComplyingTasks.keySet().stream()).collect(Collectors.toSet());
        this.runningComplyingTasks.clear();
        this.runningNonComplyingTasks.clear();
        return set;
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public void onTaskScheduled(Task task) {
        (((Boolean) task.getPropertyValue(ResourceSlotProperty.class).orElse(true)).booleanValue() ? this.runningComplyingTasks : this.runningNonComplyingTasks).put(task.getTaskId(), task);
        this.runningTaskToAttempt.put(task, Integer.valueOf(task.getAttemptIdx()));
        this.failedTasks.remove(task);
        this.serializationExecutorService.execute(() -> {
            sendControlMessage(ControlMessage.Message.newBuilder().setId(RuntimeIdManager.generateMessageId()).setListenerId("EXECUTOR_MESSAGE_LISTENER_ID").setType(ControlMessage.MessageType.ScheduleTask).setScheduleTaskMsg(ControlMessage.ScheduleTaskMsg.newBuilder().setTask(ByteString.copyFrom(SerializationUtils.serialize(task))).build()).build());
        });
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public void sendControlMessage(ControlMessage.Message message) {
        this.messageSender.send(message);
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public void onTaskExecutionComplete(String str) {
        Task removeFromRunningTasks = removeFromRunningTasks(str);
        this.runningTaskToAttempt.remove(removeFromRunningTasks);
        this.completeTasks.add(removeFromRunningTasks);
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public void onTaskExecutionFailed(String str) {
        Task removeFromRunningTasks = removeFromRunningTasks(str);
        this.runningTaskToAttempt.remove(removeFromRunningTasks);
        this.failedTasks.add(removeFromRunningTasks);
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public int getExecutorCapacity() {
        return this.resourceSpecification.getCapacity();
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public Set<Task> getRunningTasks() {
        return (Set) Stream.concat(this.runningComplyingTasks.values().stream(), this.runningNonComplyingTasks.values().stream()).collect(Collectors.toSet());
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public int getNumOfRunningTasks() {
        return getNumOfComplyingRunningTasks() + getNumOfNonComplyingRunningTasks();
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public int getNumOfComplyingRunningTasks() {
        return this.runningComplyingTasks.size();
    }

    private int getNumOfNonComplyingRunningTasks() {
        return this.runningNonComplyingTasks.size();
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public String getExecutorId() {
        return this.executorId;
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public String getContainerType() {
        return this.resourceSpecification.getContainerType();
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public String getNodeName() {
        return this.nodeName;
    }

    @Override // org.apache.nemo.runtime.master.resource.ExecutorRepresenter
    public void shutDown() {
        this.activeContext.close();
    }

    public String toString() {
        ObjectNode createObjectNode = new ObjectMapper().createObjectNode();
        createObjectNode.put("executorId", this.executorId);
        createObjectNode.put("runningTasks", getRunningTasks().toString());
        createObjectNode.put("failedTasks", this.failedTasks.toString());
        return createObjectNode.toString();
    }

    private Task removeFromRunningTasks(String str) {
        Task remove;
        if (this.runningComplyingTasks.containsKey(str)) {
            remove = this.runningComplyingTasks.remove(str);
        } else {
            if (!this.runningNonComplyingTasks.containsKey(str)) {
                throw new RuntimeException(String.format("Task %s not found in its DefaultExecutorRepresenter", str));
            }
            remove = this.runningNonComplyingTasks.remove(str);
        }
        return remove;
    }
}
