package org.apache.hadoop.yarn.server.resourcemanager;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;

/* loaded from: input_file:org/apache/hadoop/yarn/server/resourcemanager/OpportunisticContainerAllocatorAMService.class */
public class OpportunisticContainerAllocatorAMService extends ApplicationMasterService implements DistributedSchedulingAMProtocol, EventHandler<SchedulerEvent> {
    private static final Log LOG = LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
    private final NodeQueueLoadMonitor nodeMonitor;
    private final ConcurrentHashMap<String, Set<NodeId>> rackToNode;
    private final ConcurrentHashMap<String, Set<NodeId>> hostToNode;
    private final int k;

    public OpportunisticContainerAllocatorAMService(RMContext rMContext, YarnScheduler yarnScheduler) {
        super(OpportunisticContainerAllocatorAMService.class.getName(), rMContext, yarnScheduler);
        int i;
        int i2;
        this.rackToNode = new ConcurrentHashMap<>();
        this.hostToNode = new ConcurrentHashMap<>();
        this.k = rMContext.getYarnConfiguration().getInt("yarn.opportunistic-container-allocation.nodes-used", 10);
        long j = rMContext.getYarnConfiguration().getLong("yarn.nm-container-queuing.sorting-nodes-interval-ms", 1000L);
        NodeQueueLoadMonitor.LoadComparator valueOf = NodeQueueLoadMonitor.LoadComparator.valueOf(rMContext.getYarnConfiguration().get("yarn.nm-container-queuing.load-comparator", "QUEUE_LENGTH"));
        NodeQueueLoadMonitor nodeQueueLoadMonitor = new NodeQueueLoadMonitor(j, valueOf);
        float f = rMContext.getYarnConfiguration().getFloat("yarn.nm-container-queuing.queue-limit-stdev", 1.0f);
        if (valueOf == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
            i = rMContext.getYarnConfiguration().getInt("yarn.nm-container-queuing.min-queue-length", 1);
            i2 = rMContext.getYarnConfiguration().getInt("yarn.nm-container-queuing.max-queue-length", 10);
        } else {
            i = rMContext.getYarnConfiguration().getInt("yarn.nm-container-queuing.min-queue-wait-time-ms", 1);
            i2 = rMContext.getYarnConfiguration().getInt("yarn.nm-container-queuing.max-queue-wait-time-ms", 10);
        }
        nodeQueueLoadMonitor.initThresholdCalculator(f, i, i2);
        this.nodeMonitor = nodeQueueLoadMonitor;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
    public Server getServer(YarnRPC yarnRPC, Configuration configuration, InetSocketAddress inetSocketAddress, AMRMTokenSecretManager aMRMTokenSecretManager) {
        if (!YarnConfiguration.isDistSchedulingEnabled(configuration)) {
            return super.getServer(yarnRPC, configuration, inetSocketAddress, aMRMTokenSecretManager);
        }
        RPC.Server server = yarnRPC.getServer(DistributedSchedulingAMProtocol.class, this, inetSocketAddress, configuration, aMRMTokenSecretManager, configuration.getInt("yarn.resourcemanager.scheduler.client.thread-count", 50));
        server.addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER, ApplicationMasterProtocolPB.class, ApplicationMasterProtocol.ApplicationMasterProtocolService.newReflectiveBlockingService(new ApplicationMasterProtocolPBServiceImpl(this)));
        return server;
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
    public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest registerApplicationMasterRequest) throws YarnException, IOException {
        return super.registerApplicationMaster(registerApplicationMasterRequest);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
    public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest finishApplicationMasterRequest) throws YarnException, IOException {
        return super.finishApplicationMaster(finishApplicationMasterRequest);
    }

    @Override // org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
    public AllocateResponse allocate(AllocateRequest allocateRequest) throws YarnException, IOException {
        return super.allocate(allocateRequest);
    }

    public RegisterDistributedSchedulingAMResponse registerApplicationMasterForDistributedScheduling(RegisterApplicationMasterRequest registerApplicationMasterRequest) throws YarnException, IOException {
        RegisterApplicationMasterResponse registerApplicationMaster = registerApplicationMaster(registerApplicationMasterRequest);
        RegisterDistributedSchedulingAMResponse registerDistributedSchedulingAMResponse = (RegisterDistributedSchedulingAMResponse) this.recordFactory.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
        registerDistributedSchedulingAMResponse.setRegisterResponse(registerApplicationMaster);
        registerDistributedSchedulingAMResponse.setMinContainerResource(Resource.newInstance(getConfig().getInt("yarn.opportunistic-containers.min-memory-mb", 512), getConfig().getInt("yarn.opportunistic-containers.min-vcores", 1)));
        registerDistributedSchedulingAMResponse.setMaxContainerResource(Resource.newInstance(getConfig().getInt("yarn.opportunistic-containers.max-memory-mb", 2048), getConfig().getInt("yarn.opportunistic-containers.max-vcores", 4)));
        registerDistributedSchedulingAMResponse.setIncrContainerResource(Resource.newInstance(getConfig().getInt("yarn.opportunistic-containers.incr-memory-mb", 512), getConfig().getInt("yarn.opportunistic-containers.incr-vcores", 1)));
        registerDistributedSchedulingAMResponse.setContainerTokenExpiryInterval(getConfig().getInt("yarn.opportunistic-containers.container-token-expiry-ms", 600000));
        registerDistributedSchedulingAMResponse.setContainerIdStart(this.rmContext.getEpoch() << 40);
        registerDistributedSchedulingAMResponse.setNodesForScheduling(this.nodeMonitor.selectLeastLoadedNodes(this.k));
        return registerDistributedSchedulingAMResponse;
    }

    public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(DistributedSchedulingAllocateRequest distributedSchedulingAllocateRequest) throws YarnException, IOException {
        for (Container container : distributedSchedulingAllocateRequest.getAllocatedContainers()) {
            SchedulerApplicationAttempt currentAttemptForContainer = ((AbstractYarnScheduler) this.rmContext.getScheduler()).getCurrentAttemptForContainer(container.getId());
            RMContainerImpl rMContainerImpl = new RMContainerImpl(container, currentAttemptForContainer.getApplicationAttemptId(), container.getNodeId(), currentAttemptForContainer.getUser(), this.rmContext, true);
            currentAttemptForContainer.addRMContainer(container.getId(), rMContainerImpl);
            rMContainerImpl.handle((Event) new RMContainerEvent(container.getId(), RMContainerEventType.LAUNCHED));
        }
        AllocateResponse allocate = allocate(distributedSchedulingAllocateRequest.getAllocateRequest());
        DistributedSchedulingAllocateResponse distributedSchedulingAllocateResponse = (DistributedSchedulingAllocateResponse) this.recordFactory.newRecordInstance(DistributedSchedulingAllocateResponse.class);
        distributedSchedulingAllocateResponse.setAllocateResponse(allocate);
        distributedSchedulingAllocateResponse.setNodesForScheduling(this.nodeMonitor.selectLeastLoadedNodes(this.k));
        return distributedSchedulingAllocateResponse;
    }

    private void addToMapping(ConcurrentHashMap<String, Set<NodeId>> concurrentHashMap, String str, NodeId nodeId) {
        if (str != null) {
            concurrentHashMap.putIfAbsent(str, new HashSet());
            Set<NodeId> set = concurrentHashMap.get(str);
            synchronized (set) {
                set.add(nodeId);
            }
        }
    }

    private void removeFromMapping(ConcurrentHashMap<String, Set<NodeId>> concurrentHashMap, String str, NodeId nodeId) {
        if (str != null) {
            Set<NodeId> set = concurrentHashMap.get(str);
            synchronized (set) {
                set.remove(nodeId);
            }
        }
    }

    public void handle(SchedulerEvent schedulerEvent) {
        switch ((SchedulerEventType) schedulerEvent.getType()) {
            case NODE_ADDED:
                if (!(schedulerEvent instanceof NodeAddedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                NodeAddedSchedulerEvent nodeAddedSchedulerEvent = (NodeAddedSchedulerEvent) schedulerEvent;
                this.nodeMonitor.addNode(nodeAddedSchedulerEvent.getContainerReports(), nodeAddedSchedulerEvent.getAddedRMNode());
                addToMapping(this.rackToNode, nodeAddedSchedulerEvent.getAddedRMNode().getRackName(), nodeAddedSchedulerEvent.getAddedRMNode().getNodeID());
                addToMapping(this.hostToNode, nodeAddedSchedulerEvent.getAddedRMNode().getHostName(), nodeAddedSchedulerEvent.getAddedRMNode().getNodeID());
                return;
            case NODE_REMOVED:
                if (!(schedulerEvent instanceof NodeRemovedSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                NodeRemovedSchedulerEvent nodeRemovedSchedulerEvent = (NodeRemovedSchedulerEvent) schedulerEvent;
                this.nodeMonitor.removeNode(nodeRemovedSchedulerEvent.getRemovedRMNode());
                removeFromMapping(this.rackToNode, nodeRemovedSchedulerEvent.getRemovedRMNode().getRackName(), nodeRemovedSchedulerEvent.getRemovedRMNode().getNodeID());
                removeFromMapping(this.hostToNode, nodeRemovedSchedulerEvent.getRemovedRMNode().getHostName(), nodeRemovedSchedulerEvent.getRemovedRMNode().getNodeID());
                return;
            case NODE_UPDATE:
                if (!(schedulerEvent instanceof NodeUpdateSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                this.nodeMonitor.updateNode(((NodeUpdateSchedulerEvent) schedulerEvent).getRMNode());
                return;
            case NODE_RESOURCE_UPDATE:
                if (!(schedulerEvent instanceof NodeResourceUpdateSchedulerEvent)) {
                    throw new RuntimeException("Unexpected event type: " + schedulerEvent);
                }
                NodeResourceUpdateSchedulerEvent nodeResourceUpdateSchedulerEvent = (NodeResourceUpdateSchedulerEvent) schedulerEvent;
                this.nodeMonitor.updateNodeResource(nodeResourceUpdateSchedulerEvent.getRMNode(), nodeResourceUpdateSchedulerEvent.getResourceOption());
                return;
            case APP_ADDED:
            case APP_REMOVED:
            case APP_ATTEMPT_ADDED:
            case APP_ATTEMPT_REMOVED:
            case CONTAINER_EXPIRED:
            case NODE_LABELS_UPDATE:
                return;
            default:
                LOG.error("Unknown event arrived atOpportunisticContainerAllocatorAMService: " + schedulerEvent.toString());
                return;
        }
    }

    public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
        return this.nodeMonitor.getThresholdCalculator();
    }
}
