package org.apache.hadoop.yarn.server.resourcemanager;

import com.google.protobuf.RpcController;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.AllocateResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.FinishApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocolPB;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateRequestPBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.DistributedSchedulingAllocateResponsePBImpl;
import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterDistributedSchedulingAMResponsePBImpl;
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AMLivelinessMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetricsForCustomResources;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWSConsts;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.xerces.impl.io.UTF16Reader;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* JADX WARN: Classes with same name are omitted:
  input_file:test-classes/org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.class
 */
/* loaded from: input_file:hadoop-yarn-server-resourcemanager-2.10.1-tests.jar:org/apache/hadoop/yarn/server/resourcemanager/TestOpportunisticContainerAllocatorAMService.class */
public class TestOpportunisticContainerAllocatorAMService {
    private static final int GB = 1024;
    private MockRM rm;
    private DrainDispatcher dispatcher;

    @Before
    public void createAndStartRM() {
        YarnConfiguration yarnConfiguration = new YarnConfiguration(new CapacitySchedulerConfiguration());
        yarnConfiguration.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        yarnConfiguration.setBoolean("yarn.resourcemanager.opportunistic-container-allocation.enabled", true);
        yarnConfiguration.setInt("yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms", 100);
        startRM(yarnConfiguration);
    }

    public void createAndStartRMWithAutoUpdateContainer() {
        YarnConfiguration yarnConfiguration = new YarnConfiguration(new CapacitySchedulerConfiguration());
        yarnConfiguration.setBoolean("yarn.resourcemanager.auto-update.containers", true);
        yarnConfiguration.setClass("yarn.resourcemanager.scheduler.class", CapacityScheduler.class, ResourceScheduler.class);
        yarnConfiguration.setBoolean("yarn.resourcemanager.opportunistic-container-allocation.enabled", true);
        yarnConfiguration.setInt("yarn.resourcemanager.nm-container-queuing.sorting-nodes-interval-ms", 100);
        startRM(yarnConfiguration);
    }

    private void startRM(YarnConfiguration yarnConfiguration) {
        this.dispatcher = new DrainDispatcher();
        this.rm = new MockRM(yarnConfiguration) { // from class: org.apache.hadoop.yarn.server.resourcemanager.TestOpportunisticContainerAllocatorAMService.1
            @Override // org.apache.hadoop.yarn.server.resourcemanager.MockRM, org.apache.hadoop.yarn.server.resourcemanager.ResourceManager
            protected Dispatcher createDispatcher() {
                return TestOpportunisticContainerAllocatorAMService.this.dispatcher;
            }
        };
        this.rm.start();
    }

    @After
    public void stopRM() {
        if (this.rm != null) {
            this.rm.stop();
        }
    }

    @Test(timeout = 600000)
    public void testContainerPromoteAndDemoteBeforeContainerStart() throws Exception {
        HashMap hashMap = new HashMap();
        MockNM mockNM = new MockNM("h1:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM.getNodeId(), mockNM);
        MockNM mockNM2 = new MockNM("h1:4321", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM2.getNodeId(), mockNM2);
        MockNM mockNM3 = new MockNM("h2:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM3.getNodeId(), mockNM3);
        MockNM mockNM4 = new MockNM("h2:4321", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM4.getNodeId(), mockNM4);
        mockNM.registerNode();
        mockNM2.registerNode();
        mockNM3.registerNode();
        mockNM4.registerNode();
        OpportunisticContainerAllocatorAMService opportunisticContainerAllocatorAMService = (OpportunisticContainerAllocatorAMService) this.rm.getApplicationMasterService();
        RMApp submitApp = this.rm.submitApp(1024, "app", RMWSConsts.USER, (Map<ApplicationAccessType, String>) null, "default");
        submitApp.getCurrentAppAttempt().getAppAttemptId();
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, this.rm, mockNM2);
        ResourceScheduler resourceScheduler = this.rm.getResourceScheduler();
        RMNode rMNode = this.rm.getRMContext().getRMNodes().get(mockNM.getNodeId());
        RMNode rMNode2 = this.rm.getRMContext().getRMNodes().get(mockNM2.getNodeId());
        RMNode rMNode3 = this.rm.getRMContext().getRMNodes().get(mockNM3.getNodeId());
        RMNode rMNode4 = this.rm.getRMContext().getRMNodes().get(mockNM4.getNodeId());
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        mockNM3.nodeHeartbeat(true);
        mockNM4.nodeHeartbeat(true);
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode2));
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode3));
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode4));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode2));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode3));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode4));
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        mockNM3.nodeHeartbeat(true);
        mockNM4.nodeHeartbeat(true);
        Thread.sleep(1000L);
        QueueMetrics metrics = ((CapacityScheduler) resourceScheduler).getRootQueue().getMetrics();
        verifyMetrics(metrics, 15360L, 15, TestQueueMetricsForCustomResources.GB, 1, 1);
        List allocatedContainers = launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 2, true, (String) null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true))), null).getAllocatedContainers();
        Assert.assertEquals(2L, allocatedContainers.size());
        Container container = (Container) allocatedContainers.get(0);
        MockNM mockNM5 = (MockNM) hashMap.get(container.getNodeId());
        MockNM mockNM6 = null;
        for (NodeId nodeId : hashMap.keySet()) {
            if (nodeId.getHost().equals(mockNM5.getNodeId().getHost()) && nodeId.getPort() != mockNM5.getNodeId().getPort()) {
                mockNM6 = (MockNM) hashMap.get(nodeId);
            }
        }
        verifyMetrics(metrics, 15360L, 15, TestQueueMetricsForCustomResources.GB, 1, 1);
        launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.GUARANTEED)));
        mockNM6.nodeHeartbeat(true);
        this.rm.drainEvents();
        Assert.assertEquals(0L, launchAndRegisterAM.allocate(null, null).getUpdatedContainers().size());
        this.dispatcher.waitForEventThreadToWait();
        this.rm.drainEvents();
        verifyMetrics(metrics, 15360L, 15, TestQueueMetricsForCustomResources.GB, 1, 1);
        AllocateResponse sendContainerUpdateRequest = launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.GUARANTEED)));
        Assert.assertEquals(0L, sendContainerUpdateRequest.getUpdatedContainers().size());
        Assert.assertEquals(1L, sendContainerUpdateRequest.getUpdateErrors().size());
        Assert.assertEquals(RMServerUtils.UPDATE_OUTSTANDING_ERROR, ((UpdateContainerError) sendContainerUpdateRequest.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals(container.getId(), ((UpdateContainerError) sendContainerUpdateRequest.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        AllocateResponse sendContainerUpdateRequest2 = launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(1, container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.GUARANTEED)));
        Assert.assertEquals(0L, sendContainerUpdateRequest2.getUpdatedContainers().size());
        Assert.assertEquals(1L, sendContainerUpdateRequest2.getUpdateErrors().size());
        Assert.assertEquals("INCORRECT_CONTAINER_VERSION_ERROR", ((UpdateContainerError) sendContainerUpdateRequest2.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals(0L, ((UpdateContainerError) sendContainerUpdateRequest2.getUpdateErrors().get(0)).getCurrentContainerVersion());
        Assert.assertEquals(container.getId(), ((UpdateContainerError) sendContainerUpdateRequest2.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        mockNM5.nodeHeartbeat(true);
        this.rm.drainEvents();
        AllocateResponse allocate = launchAndRegisterAM.allocate(null, null);
        Assert.assertEquals(1L, allocate.getUpdatedContainers().size());
        Container container2 = ((UpdatedContainer) allocate.getUpdatedContainers().get(0)).getContainer();
        Assert.assertEquals(ExecutionType.GUARANTEED, container2.getExecutionType());
        Assert.assertEquals(container2.getId(), container.getId());
        Assert.assertEquals(container2.getVersion(), container.getVersion() + 1);
        verifyMetrics(metrics, 14336L, 14, 2048L, 2, 2);
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        mockNM3.nodeHeartbeat(true);
        mockNM4.nodeHeartbeat(true);
        this.rm.drainEvents();
        Assert.assertEquals(RMContainerState.ACQUIRED, ((CapacityScheduler) resourceScheduler).getApplicationAttempt(container2.getId().getApplicationAttemptId()).getRMContainer(container2.getId()).getState());
        AllocateResponse sendContainerUpdateRequest3 = launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(container2.getVersion(), container2.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.OPPORTUNISTIC)));
        Assert.assertEquals(1L, sendContainerUpdateRequest3.getUpdatedContainers().size());
        Container container3 = ((UpdatedContainer) sendContainerUpdateRequest3.getUpdatedContainers().get(0)).getContainer();
        Assert.assertEquals(ExecutionType.OPPORTUNISTIC, container3.getExecutionType());
        Assert.assertEquals(container3.getId(), container.getId());
        Assert.assertEquals(container3.getVersion(), container.getVersion() + 2);
        this.dispatcher.waitForEventThreadToWait();
        this.rm.drainEvents();
        verifyMetrics(metrics, 15360L, 15, TestQueueMetricsForCustomResources.GB, 1, 1);
    }

    @Test(timeout = 60000)
    public void testContainerPromoteAfterContainerStart() throws Exception {
        HashMap hashMap = new HashMap();
        MockNM mockNM = new MockNM("h1:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM.getNodeId(), mockNM);
        MockNM mockNM2 = new MockNM("h2:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM2.getNodeId(), mockNM2);
        mockNM.registerNode();
        mockNM2.registerNode();
        OpportunisticContainerAllocatorAMService opportunisticContainerAllocatorAMService = (OpportunisticContainerAllocatorAMService) this.rm.getApplicationMasterService();
        RMApp submitApp = this.rm.submitApp(1024, "app", RMWSConsts.USER, (Map<ApplicationAccessType, String>) null, "default");
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, this.rm, mockNM2);
        ResourceScheduler resourceScheduler = this.rm.getResourceScheduler();
        RMNode rMNode = this.rm.getRMContext().getRMNodes().get(mockNM.getNodeId());
        RMNode rMNode2 = this.rm.getRMContext().getRMNodes().get(mockNM2.getNodeId());
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        ((RMNodeImpl) rMNode).setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
        ((RMNodeImpl) rMNode2).setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
        ((CapacityScheduler) resourceScheduler).getApplicationAttempt(appAttemptId).getOpportunisticContainerContext();
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode2));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode2));
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        Thread.sleep(1000L);
        QueueMetrics metrics = ((CapacityScheduler) resourceScheduler).getRootQueue().getMetrics();
        verifyMetrics(metrics, 7168L, 7, TestQueueMetricsForCustomResources.GB, 1, 1);
        List allocatedContainers = launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 2, true, (String) null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true))), null).getAllocatedContainers();
        Assert.assertEquals(2L, allocatedContainers.size());
        Container container = (Container) allocatedContainers.get(0);
        MockNM mockNM3 = (MockNM) hashMap.get(container.getNodeId());
        mockNM3.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true);
        this.rm.drainEvents();
        Assert.assertEquals(RMContainerState.RUNNING, ((CapacityScheduler) resourceScheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId()).getState());
        verifyMetrics(metrics, 7168L, 7, TestQueueMetricsForCustomResources.GB, 1, 1);
        launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.GUARANTEED)));
        verifyMetrics(metrics, 7168L, 7, TestQueueMetricsForCustomResources.GB, 1, 1);
        AllocateResponse sendContainerUpdateRequest = launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.GUARANTEED)));
        Assert.assertEquals(0L, sendContainerUpdateRequest.getUpdatedContainers().size());
        Assert.assertEquals(1L, sendContainerUpdateRequest.getUpdateErrors().size());
        Assert.assertEquals(RMServerUtils.UPDATE_OUTSTANDING_ERROR, ((UpdateContainerError) sendContainerUpdateRequest.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals(container.getId(), ((UpdateContainerError) sendContainerUpdateRequest.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        mockNM3.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true);
        this.rm.drainEvents();
        AllocateResponse allocate = launchAndRegisterAM.allocate(null, null);
        Assert.assertEquals(1L, allocate.getUpdatedContainers().size());
        Container container2 = ((UpdatedContainer) allocate.getUpdatedContainers().get(0)).getContainer();
        Assert.assertEquals(ExecutionType.GUARANTEED, container2.getExecutionType());
        Assert.assertEquals(container2.getId(), container.getId());
        Assert.assertEquals(container2.getVersion(), container.getVersion() + 1);
        Assert.assertEquals(RMContainerState.RUNNING, ((CapacityScheduler) resourceScheduler).getApplicationAttempt(container2.getId().getApplicationAttemptId()).getRMContainer(container2.getId()).getState());
        verifyMetrics(metrics, 6144L, 6, 2048L, 2, 2);
    }

    @Test(timeout = 600000)
    public void testContainerPromoteAfterContainerComplete() throws Exception {
        HashMap hashMap = new HashMap();
        MockNM mockNM = new MockNM("h1:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM.getNodeId(), mockNM);
        MockNM mockNM2 = new MockNM("h2:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        hashMap.put(mockNM2.getNodeId(), mockNM2);
        mockNM.registerNode();
        mockNM2.registerNode();
        OpportunisticContainerAllocatorAMService opportunisticContainerAllocatorAMService = (OpportunisticContainerAllocatorAMService) this.rm.getApplicationMasterService();
        RMApp submitApp = this.rm.submitApp(1024, "app", RMWSConsts.USER, (Map<ApplicationAccessType, String>) null, "default");
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, this.rm, mockNM2);
        ResourceScheduler resourceScheduler = this.rm.getResourceScheduler();
        RMNode rMNode = this.rm.getRMContext().getRMNodes().get(mockNM.getNodeId());
        RMNode rMNode2 = this.rm.getRMContext().getRMNodes().get(mockNM2.getNodeId());
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        ((RMNodeImpl) rMNode).setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
        ((RMNodeImpl) rMNode2).setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
        ((CapacityScheduler) resourceScheduler).getApplicationAttempt(appAttemptId).getOpportunisticContainerContext();
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode2));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode2));
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        Thread.sleep(1000L);
        QueueMetrics metrics = ((CapacityScheduler) resourceScheduler).getRootQueue().getMetrics();
        verifyMetrics(metrics, 7168L, 7, TestQueueMetricsForCustomResources.GB, 1, 1);
        List allocatedContainers = launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 2, true, (String) null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true))), null).getAllocatedContainers();
        Assert.assertEquals(2L, allocatedContainers.size());
        Container container = (Container) allocatedContainers.get(0);
        MockNM mockNM3 = (MockNM) hashMap.get(container.getNodeId());
        mockNM3.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true);
        this.rm.drainEvents();
        Assert.assertEquals(RMContainerState.RUNNING, ((CapacityScheduler) resourceScheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId()).getState());
        mockNM3.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.COMPLETE, "", 0)), true);
        this.rm.drainEvents();
        Assert.assertNull(((CapacityScheduler) resourceScheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId()));
        verifyMetrics(metrics, 7168L, 7, TestQueueMetricsForCustomResources.GB, 1, 1);
        AllocateResponse sendContainerUpdateRequest = launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.GUARANTEED)));
        Assert.assertEquals(1L, sendContainerUpdateRequest.getCompletedContainersStatuses().size());
        Assert.assertEquals(container.getId(), ((ContainerStatus) sendContainerUpdateRequest.getCompletedContainersStatuses().get(0)).getContainerId());
        Assert.assertEquals(0L, sendContainerUpdateRequest.getUpdatedContainers().size());
        Assert.assertEquals(1L, sendContainerUpdateRequest.getUpdateErrors().size());
        Assert.assertEquals("INVALID_CONTAINER_ID", ((UpdateContainerError) sendContainerUpdateRequest.getUpdateErrors().get(0)).getReason());
        Assert.assertEquals(container.getId(), ((UpdateContainerError) sendContainerUpdateRequest.getUpdateErrors().get(0)).getUpdateContainerRequest().getContainerId());
        verifyMetrics(metrics, 7168L, 7, TestQueueMetricsForCustomResources.GB, 1, 1);
    }

    @Test(timeout = 600000)
    public void testContainerAutoUpdateContainer() throws Exception {
        this.rm.stop();
        createAndStartRMWithAutoUpdateContainer();
        MockNM mockNM = new MockNM("h1:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        mockNM.registerNode();
        OpportunisticContainerAllocatorAMService opportunisticContainerAllocatorAMService = (OpportunisticContainerAllocatorAMService) this.rm.getApplicationMasterService();
        RMApp submitApp = this.rm.submitApp(1024, "app", RMWSConsts.USER, (Map<ApplicationAccessType, String>) null, "default");
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, this.rm, mockNM);
        ResourceScheduler resourceScheduler = this.rm.getResourceScheduler();
        RMNode rMNode = this.rm.getRMContext().getRMNodes().get(mockNM.getNodeId());
        mockNM.nodeHeartbeat(true);
        ((RMNodeImpl) rMNode).setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
        ((CapacityScheduler) resourceScheduler).getApplicationAttempt(appAttemptId).getOpportunisticContainerContext();
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode));
        mockNM.nodeHeartbeat(true);
        Thread.sleep(1000L);
        List allocatedContainers = launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 2, true, (String) null, ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true))), null).getAllocatedContainers();
        allocatedContainers.addAll(launchAndRegisterAM.allocate(null, null).getAllocatedContainers());
        Assert.assertEquals(2L, allocatedContainers.size());
        Container container = (Container) allocatedContainers.get(0);
        mockNM.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true);
        this.rm.drainEvents();
        Assert.assertEquals(RMContainerState.RUNNING, ((CapacityScheduler) resourceScheduler).getApplicationAttempt(container.getId().getApplicationAttemptId()).getRMContainer(container.getId()).getState());
        launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(0, container.getId(), ContainerUpdateType.PROMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.GUARANTEED)));
        mockNM.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.OPPORTUNISTIC, ContainerState.RUNNING, "", 0)), true);
        this.rm.drainEvents();
        AllocateResponse allocate = launchAndRegisterAM.allocate(new ArrayList(), new ArrayList());
        Assert.assertEquals(1L, allocate.getUpdatedContainers().size());
        UpdatedContainer updatedContainer = (UpdatedContainer) allocate.getUpdatedContainers().get(0);
        Assert.assertEquals(container.getId(), updatedContainer.getContainer().getId());
        Assert.assertEquals(ExecutionType.GUARANTEED, updatedContainer.getContainer().getExecutionType());
        NodeHeartbeatResponse nodeHeartbeat = mockNM.nodeHeartbeat(true);
        Assert.assertEquals(1L, nodeHeartbeat.getContainersToUpdate().size());
        Container container2 = (Container) nodeHeartbeat.getContainersToUpdate().get(0);
        Assert.assertEquals(container.getId(), container2.getId());
        Assert.assertEquals(ExecutionType.GUARANTEED, container2.getExecutionType());
        AllocateResponse sendContainerUpdateRequest = launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(1, container.getId(), ContainerUpdateType.INCREASE_RESOURCE, Resources.createResource(2048, 1), (ExecutionType) null)));
        NodeHeartbeatResponse nodeHeartbeat2 = mockNM.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.GUARANTEED, ContainerState.RUNNING, "", 0)), true);
        this.rm.drainEvents();
        if (sendContainerUpdateRequest.getUpdatedContainers().size() == 0) {
            sendContainerUpdateRequest = launchAndRegisterAM.allocate(new ArrayList(), new ArrayList());
        }
        Assert.assertEquals(1L, sendContainerUpdateRequest.getUpdatedContainers().size());
        UpdatedContainer updatedContainer2 = (UpdatedContainer) sendContainerUpdateRequest.getUpdatedContainers().get(0);
        Assert.assertEquals(container.getId(), updatedContainer2.getContainer().getId());
        Assert.assertEquals(Resource.newInstance(2048, 1), updatedContainer2.getContainer().getResource());
        this.rm.drainEvents();
        if (nodeHeartbeat2.getContainersToUpdate().size() == 0) {
            nodeHeartbeat2 = mockNM.nodeHeartbeat(true);
        }
        Assert.assertEquals(1L, nodeHeartbeat2.getContainersToUpdate().size());
        Assert.assertEquals(Resource.newInstance(2048, 1), ((Container) nodeHeartbeat2.getContainersToUpdate().get(0)).getResource());
        Assert.assertEquals(1L, launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(2, container.getId(), ContainerUpdateType.DECREASE_RESOURCE, Resources.createResource(1024, 1), (ExecutionType) null))).getUpdatedContainers().size());
        this.rm.drainEvents();
        NodeHeartbeatResponse nodeHeartbeat3 = mockNM.nodeHeartbeat(true);
        Assert.assertEquals(1L, nodeHeartbeat3.getContainersToUpdate().size());
        Assert.assertEquals(Resource.newInstance(1024, 1), ((Container) nodeHeartbeat3.getContainersToUpdate().get(0)).getResource());
        mockNM.nodeHeartbeat(true);
        AllocateResponse sendContainerUpdateRequest2 = launchAndRegisterAM.sendContainerUpdateRequest(Arrays.asList(UpdateContainerRequest.newInstance(3, container.getId(), ContainerUpdateType.DEMOTE_EXECUTION_TYPE, (Resource) null, ExecutionType.OPPORTUNISTIC)));
        NodeHeartbeatResponse nodeHeartbeat4 = mockNM.nodeHeartbeat(Arrays.asList(ContainerStatus.newInstance(container.getId(), ExecutionType.GUARANTEED, ContainerState.RUNNING, "", 0)), true);
        this.rm.drainEvents();
        if (sendContainerUpdateRequest2.getUpdatedContainers().size() == 0) {
            sendContainerUpdateRequest2 = launchAndRegisterAM.allocate(new ArrayList(), new ArrayList());
        }
        Assert.assertEquals(1L, sendContainerUpdateRequest2.getUpdatedContainers().size());
        Assert.assertEquals(ExecutionType.OPPORTUNISTIC, ((UpdatedContainer) sendContainerUpdateRequest2.getUpdatedContainers().get(0)).getContainer().getExecutionType());
        if (nodeHeartbeat4.getContainersToUpdate().size() == 0) {
            nodeHeartbeat4 = mockNM.nodeHeartbeat(true);
        }
        Assert.assertEquals(1L, nodeHeartbeat4.getContainersToUpdate().size());
        Assert.assertEquals(ExecutionType.OPPORTUNISTIC, ((Container) nodeHeartbeat4.getContainersToUpdate().get(0)).getExecutionType());
    }

    private void verifyMetrics(QueueMetrics queueMetrics, long j, int i, long j2, int i2, int i3) {
        Assert.assertEquals(j, queueMetrics.getAvailableMB());
        Assert.assertEquals(i, queueMetrics.getAvailableVirtualCores());
        Assert.assertEquals(j2, queueMetrics.getAllocatedMB());
        Assert.assertEquals(i2, queueMetrics.getAllocatedVirtualCores());
        Assert.assertEquals(i3, queueMetrics.getAllocatedContainers());
    }

    @Test(timeout = 60000)
    public void testNodeRemovalDuringAllocate() throws Exception {
        MockNM mockNM = new MockNM("h1:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        MockNM mockNM2 = new MockNM("h2:1234", UTF16Reader.DEFAULT_BUFFER_SIZE, this.rm.getResourceTrackerService());
        mockNM.registerNode();
        mockNM2.registerNode();
        OpportunisticContainerAllocatorAMService opportunisticContainerAllocatorAMService = (OpportunisticContainerAllocatorAMService) this.rm.getApplicationMasterService();
        RMApp submitApp = this.rm.submitApp(1024, "app", RMWSConsts.USER, (Map<ApplicationAccessType, String>) null, "default");
        ApplicationAttemptId appAttemptId = submitApp.getCurrentAppAttempt().getAppAttemptId();
        MockAM launchAndRegisterAM = MockRM.launchAndRegisterAM(submitApp, this.rm, mockNM2);
        ResourceScheduler resourceScheduler = this.rm.getResourceScheduler();
        RMNode rMNode = this.rm.getRMContext().getRMNodes().get(mockNM.getNodeId());
        RMNode rMNode2 = this.rm.getRMContext().getRMNodes().get(mockNM2.getNodeId());
        mockNM.nodeHeartbeat(true);
        mockNM2.nodeHeartbeat(true);
        ((RMNodeImpl) rMNode).setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
        ((RMNodeImpl) rMNode2).setOpportunisticContainersStatus(getOppurtunisticStatus(-1, 100));
        OpportunisticContainerContext opportunisticContainerContext = ((CapacityScheduler) resourceScheduler).getApplicationAttempt(appAttemptId).getOpportunisticContainerContext();
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeAddedSchedulerEvent(rMNode2));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode));
        opportunisticContainerAllocatorAMService.handle(new NodeUpdateSchedulerEvent(rMNode2));
        for (int i = 0; i < 10; i++) {
            launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 2)), null);
            if (opportunisticContainerContext.getNodeMap().size() == 2) {
                break;
            }
            Thread.sleep(50L);
        }
        Assert.assertEquals(2L, opportunisticContainerContext.getNodeMap().size());
        resourceScheduler.handle(new NodeRemovedSchedulerEvent(rMNode));
        for (int i2 = 0; i2 < 10; i2++) {
            try {
                launchAndRegisterAM.allocate(Arrays.asList(ResourceRequest.newInstance(Priority.newInstance(1), "*", Resources.createResource(1024), 2)), null);
            } catch (Exception e) {
                Assert.fail("Allocate request should be handled on node removal");
            }
            if (opportunisticContainerContext.getNodeMap().size() == 1) {
                break;
            }
            Thread.sleep(50L);
        }
        Assert.assertEquals(1L, opportunisticContainerContext.getNodeMap().size());
    }

    private OpportunisticContainersStatus getOppurtunisticStatus(int i, int i2) {
        OpportunisticContainersStatus opportunisticContainersStatus = (OpportunisticContainersStatus) Mockito.mock(OpportunisticContainersStatus.class);
        Mockito.when(Integer.valueOf(opportunisticContainersStatus.getEstimatedQueueWaitTime())).thenReturn(Integer.valueOf(i));
        Mockito.when(Integer.valueOf(opportunisticContainersStatus.getWaitQueueLength())).thenReturn(Integer.valueOf(i2));
        return opportunisticContainersStatus;
    }

    @Test
    public void testRPCWrapping() throws Exception {
        final Configuration configuration = new Configuration();
        configuration.set("yarn.ipc.rpc.class", HadoopYarnProtoRPC.class.getName());
        YarnRPC create = YarnRPC.create(configuration);
        InetSocketAddress createSocketAddr = NetUtils.createSocketAddr("localhost:0");
        configuration.setSocketAddr("yarn.resourcemanager.scheduler.address", createSocketAddr);
        RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory((Configuration) null);
        RMContextImpl rMContextImpl = new RMContextImpl() { // from class: org.apache.hadoop.yarn.server.resourcemanager.TestOpportunisticContainerAllocatorAMService.2
            @Override // org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl, org.apache.hadoop.yarn.server.resourcemanager.RMContext
            public AMLivelinessMonitor getAMLivelinessMonitor() {
                return null;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl, org.apache.hadoop.yarn.server.resourcemanager.RMContext
            public Configuration getYarnConfiguration() {
                return new YarnConfiguration();
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl, org.apache.hadoop.yarn.server.resourcemanager.RMContext
            public RMContainerTokenSecretManager getContainerTokenSecretManager() {
                return new RMContainerTokenSecretManager(configuration);
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl, org.apache.hadoop.yarn.server.resourcemanager.RMContext
            public ResourceScheduler getScheduler() {
                return new FifoScheduler();
            }
        };
        Container container = (Container) recordFactory.newRecordInstance(Container.class);
        container.setExecutionType(ExecutionType.OPPORTUNISTIC);
        container.setId(ContainerId.newContainerId(ApplicationAttemptId.newInstance(ApplicationId.newInstance(12345L, 1), 2), 3L));
        AllocateRequestPBImpl allocateRequestPBImpl = (AllocateRequestPBImpl) recordFactory.newRecordInstance(AllocateRequest.class);
        allocateRequestPBImpl.setAskList(Arrays.asList(ResourceRequest.newInstance(Priority.UNDEFINED, "a", Resource.newInstance(1, 2), 1, true, "exp", ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC, true))));
        OpportunisticContainerAllocatorAMService createService = createService(recordFactory, rMContextImpl, container);
        configuration.setBoolean("yarn.nodemanager.distributed-scheduling.enabled", true);
        Server server = createService.getServer(create, configuration, createSocketAddr, null);
        server.start();
        RPC.setProtocolEngine(configuration, ApplicationMasterProtocolPB.class, ProtobufRpcEngine.class);
        ApplicationMasterProtocolPB applicationMasterProtocolPB = (ApplicationMasterProtocolPB) RPC.getProxy(ApplicationMasterProtocolPB.class, 1L, NetUtils.getConnectAddress(server), configuration);
        Assert.assertEquals("dummyQueue", new RegisterApplicationMasterResponsePBImpl(applicationMasterProtocolPB.registerApplicationMaster((RpcController) null, ((RegisterApplicationMasterRequestPBImpl) recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class)).getProto())).getQueue());
        Assert.assertEquals(false, Boolean.valueOf(new FinishApplicationMasterResponsePBImpl(applicationMasterProtocolPB.finishApplicationMaster((RpcController) null, ((FinishApplicationMasterRequestPBImpl) recordFactory.newRecordInstance(FinishApplicationMasterRequest.class)).getProto())).getIsUnregistered()));
        List allocatedContainers = new AllocateResponsePBImpl(applicationMasterProtocolPB.allocate((RpcController) null, ((AllocateRequestPBImpl) recordFactory.newRecordInstance(AllocateRequest.class)).getProto())).getAllocatedContainers();
        Assert.assertEquals(1L, allocatedContainers.size());
        Assert.assertEquals(ExecutionType.OPPORTUNISTIC, ((Container) allocatedContainers.get(0)).getExecutionType());
        Assert.assertEquals(12345L, r0.getNumClusterNodes());
        RPC.setProtocolEngine(configuration, DistributedSchedulingAMProtocolPB.class, ProtobufRpcEngine.class);
        DistributedSchedulingAMProtocolPB distributedSchedulingAMProtocolPB = (DistributedSchedulingAMProtocolPB) RPC.getProxy(DistributedSchedulingAMProtocolPB.class, 1L, NetUtils.getConnectAddress(server), configuration);
        RegisterDistributedSchedulingAMResponsePBImpl registerDistributedSchedulingAMResponsePBImpl = new RegisterDistributedSchedulingAMResponsePBImpl(distributedSchedulingAMProtocolPB.registerApplicationMasterForDistributedScheduling((RpcController) null, ((RegisterApplicationMasterRequestPBImpl) recordFactory.newRecordInstance(RegisterApplicationMasterRequest.class)).getProto()));
        Assert.assertEquals(54321L, registerDistributedSchedulingAMResponsePBImpl.getContainerIdStart());
        Assert.assertEquals(4L, registerDistributedSchedulingAMResponsePBImpl.getMaxContainerResource().getVirtualCores());
        Assert.assertEquals(TestQueueMetricsForCustomResources.GB, registerDistributedSchedulingAMResponsePBImpl.getMinContainerResource().getMemorySize());
        Assert.assertEquals(2L, registerDistributedSchedulingAMResponsePBImpl.getIncrContainerResource().getVirtualCores());
        DistributedSchedulingAllocateRequestPBImpl distributedSchedulingAllocateRequestPBImpl = (DistributedSchedulingAllocateRequestPBImpl) recordFactory.newRecordInstance(DistributedSchedulingAllocateRequest.class);
        distributedSchedulingAllocateRequestPBImpl.setAllocateRequest(allocateRequestPBImpl);
        distributedSchedulingAllocateRequestPBImpl.setAllocatedContainers(Arrays.asList(container));
        Assert.assertEquals("h1", ((RemoteNode) new DistributedSchedulingAllocateResponsePBImpl(distributedSchedulingAMProtocolPB.allocateForDistributedScheduling((RpcController) null, distributedSchedulingAllocateRequestPBImpl.getProto())).getNodesForScheduling().get(0)).getNodeId().getHost());
        Assert.assertEquals(false, Boolean.valueOf(new FinishApplicationMasterResponsePBImpl(distributedSchedulingAMProtocolPB.finishApplicationMaster((RpcController) null, ((FinishApplicationMasterRequestPBImpl) recordFactory.newRecordInstance(FinishApplicationMasterRequest.class)).getProto())).getIsUnregistered()));
    }

    private OpportunisticContainerAllocatorAMService createService(final RecordFactory recordFactory, RMContext rMContext, final Container container) {
        return new OpportunisticContainerAllocatorAMService(rMContext, null) { // from class: org.apache.hadoop.yarn.server.resourcemanager.TestOpportunisticContainerAllocatorAMService.3
            @Override // org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
            public RegisterApplicationMasterResponse registerApplicationMaster(RegisterApplicationMasterRequest registerApplicationMasterRequest) throws YarnException, IOException {
                RegisterApplicationMasterResponse registerApplicationMasterResponse = (RegisterApplicationMasterResponse) recordFactory.newRecordInstance(RegisterApplicationMasterResponse.class);
                registerApplicationMasterResponse.setQueue("dummyQueue");
                return registerApplicationMasterResponse;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
            public FinishApplicationMasterResponse finishApplicationMaster(FinishApplicationMasterRequest finishApplicationMasterRequest) throws YarnException, IOException {
                FinishApplicationMasterResponse finishApplicationMasterResponse = (FinishApplicationMasterResponse) recordFactory.newRecordInstance(FinishApplicationMasterResponse.class);
                finishApplicationMasterResponse.setIsUnregistered(false);
                return finishApplicationMasterResponse;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService
            public AllocateResponse allocate(AllocateRequest allocateRequest) throws YarnException, IOException {
                AllocateResponse allocateResponse = (AllocateResponse) recordFactory.newRecordInstance(AllocateResponse.class);
                allocateResponse.setNumClusterNodes(12345);
                allocateResponse.setAllocatedContainers(Arrays.asList(container));
                return allocateResponse;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.OpportunisticContainerAllocatorAMService
            public RegisterDistributedSchedulingAMResponse registerApplicationMasterForDistributedScheduling(RegisterApplicationMasterRequest registerApplicationMasterRequest) throws YarnException, IOException {
                RegisterDistributedSchedulingAMResponse registerDistributedSchedulingAMResponse = (RegisterDistributedSchedulingAMResponse) recordFactory.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
                registerDistributedSchedulingAMResponse.setContainerIdStart(54321L);
                registerDistributedSchedulingAMResponse.setMaxContainerResource(Resource.newInstance(UTF16Reader.DEFAULT_BUFFER_SIZE, 4));
                registerDistributedSchedulingAMResponse.setMinContainerResource(Resource.newInstance(1024, 1));
                registerDistributedSchedulingAMResponse.setIncrContainerResource(Resource.newInstance(2048, 2));
                return registerDistributedSchedulingAMResponse;
            }

            @Override // org.apache.hadoop.yarn.server.resourcemanager.OpportunisticContainerAllocatorAMService
            public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(DistributedSchedulingAllocateRequest distributedSchedulingAllocateRequest) throws YarnException, IOException {
                List askList = distributedSchedulingAllocateRequest.getAllocateRequest().getAskList();
                List allocatedContainers = distributedSchedulingAllocateRequest.getAllocatedContainers();
                Assert.assertEquals(1L, allocatedContainers.size());
                Assert.assertEquals(ExecutionType.OPPORTUNISTIC, ((Container) allocatedContainers.get(0)).getExecutionType());
                Assert.assertEquals(1L, askList.size());
                Assert.assertTrue(((ResourceRequest) askList.get(0)).getExecutionTypeRequest().getEnforceExecutionType());
                DistributedSchedulingAllocateResponse distributedSchedulingAllocateResponse = (DistributedSchedulingAllocateResponse) recordFactory.newRecordInstance(DistributedSchedulingAllocateResponse.class);
                distributedSchedulingAllocateResponse.setNodesForScheduling(Arrays.asList(RemoteNode.newInstance(NodeId.newInstance("h1", 1234), "http://h1:4321")));
                return distributedSchedulingAllocateResponse;
            }
        };
    }
}
