package org.apache.hadoop.yarn.server.nodemanager.amrmproxy;

import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.registry.client.api.RegistryOperations;
import org.apache.hadoop.registry.client.impl.FSRegistryOperationsService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.PreemptionContainer;
import org.apache.hadoop.yarn.api.records.PreemptionContract;
import org.apache.hadoop.yarn.api.records.PreemptionMessage;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MockResourceManagerFacade;
import org.apache.hadoop.yarn.server.federation.policies.manager.UniformBroadcastPolicyManager;
import org.apache.hadoop.yarn.server.federation.store.impl.MemoryFederationStateStore;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterDeregisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegisterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.nodemanager.Context;
import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
import org.apache.hadoop.yarn.server.nodemanager.nodelabels.AbstractNodeLabelsProvider;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
import org.apache.hadoop.yarn.util.Records;
import org.apache.xerces.dom3.as.ASDataType;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:test-classes/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.class */
public class TestFederationInterceptor extends BaseAMRMProxyTest {
    private static final Logger LOG = LoggerFactory.getLogger(TestFederationInterceptor.class);
    public static final String HOME_SC_ID = "SC-home";
    private TestableFederationInterceptor interceptor;
    private MemoryFederationStateStore stateStore;
    private NMStateStoreService nmStateStore;
    private RegistryOperations registry;
    private Context nmContext;
    private int testAppId;
    private ApplicationAttemptId attemptId;
    private volatile int lastResponseId;

    /* loaded from: input_file:test-classes/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor$ConcurrentRegisterAMCallable.class */
    public class ConcurrentRegisterAMCallable implements Callable<RegisterApplicationMasterResponse> {
        public ConcurrentRegisterAMCallable() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public RegisterApplicationMasterResponse call() throws Exception {
            RegisterApplicationMasterResponse registerApplicationMasterResponse;
            try {
                registerApplicationMasterResponse = TestFederationInterceptor.this.interceptor.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance((String) null, ASDataType.COMPLEX_DATATYPE, (String) null));
                TestFederationInterceptor.this.lastResponseId = 0;
            } catch (Exception e) {
                TestFederationInterceptor.LOG.info("Register thread exception", e);
                registerApplicationMasterResponse = null;
            }
            return registerApplicationMasterResponse;
        }
    }

    @Override // org.apache.hadoop.yarn.server.nodemanager.amrmproxy.BaseAMRMProxyTest
    public void setUp() throws IOException {
        super.setUp();
        this.interceptor = new TestableFederationInterceptor();
        this.stateStore = new MemoryFederationStateStore();
        this.stateStore.init(getConf());
        FederationStateStoreFacade.getInstance().reinitialize(this.stateStore, getConf());
        this.nmStateStore = new NMMemoryStateStoreService();
        this.nmStateStore.init(getConf());
        this.nmStateStore.start();
        this.registry = new FSRegistryOperationsService();
        this.registry.init(getConf());
        this.registry.start();
        this.testAppId = 1;
        this.attemptId = getApplicationAttemptId(this.testAppId);
        this.nmContext = new NodeManager.NMContext(null, null, null, null, this.nmStateStore, false, getConf());
        this.interceptor.init(new AMRMProxyApplicationContextImpl(this.nmContext, getConf(), this.attemptId, "test-user", null, null, null, this.registry));
        this.interceptor.cleanupRegistry();
        this.lastResponseId = 0;
    }

    @Override // org.apache.hadoop.yarn.server.nodemanager.amrmproxy.BaseAMRMProxyTest
    public void tearDown() {
        this.interceptor.cleanupRegistry();
        this.interceptor.shutdown();
        this.registry.stop();
        super.tearDown();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.hadoop.yarn.server.nodemanager.amrmproxy.BaseAMRMProxyTest
    public YarnConfiguration createConfiguration() {
        YarnConfiguration yarnConfiguration = new YarnConfiguration();
        yarnConfiguration.setBoolean("yarn.nodemanager.amrmproxy.enabled", true);
        yarnConfiguration.setBoolean("yarn.federation.enabled", true);
        String name = PassThroughRequestInterceptor.class.getName();
        yarnConfiguration.set("yarn.nodemanager.amrmproxy.interceptor-class.pipeline", name + AbstractNodeLabelsProvider.NODE_LABELS_SEPRATOR + name + AbstractNodeLabelsProvider.NODE_LABELS_SEPRATOR + TestableFederationInterceptor.class.getName());
        yarnConfiguration.set("yarn.federation.policy-manager", UniformBroadcastPolicyManager.class.getName());
        yarnConfiguration.set("yarn.resourcemanager.cluster-id", HOME_SC_ID);
        yarnConfiguration.setInt("yarn.federation.cache-ttl.secs", 0);
        yarnConfiguration.setLong("yarn.federation.amrmproxy.subcluster.timeout.ms", 500L);
        return yarnConfiguration;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void registerSubCluster(SubClusterId subClusterId) throws YarnException {
        this.stateStore.registerSubCluster(SubClusterRegisterRequest.newInstance(SubClusterInfo.newInstance(subClusterId, "1.2.3.4:1", "1.2.3.4:2", "1.2.3.4:3", "1.2.3.4:4", SubClusterState.SC_RUNNING, 0L, "capacity")));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void deRegisterSubCluster(SubClusterId subClusterId) throws YarnException {
        this.stateStore.deregisterSubCluster(SubClusterDeregisterRequest.newInstance(subClusterId, SubClusterState.SC_UNREGISTERED));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Container> getContainersAndAssert(int i, int i2) throws Exception {
        AllocateRequest allocateRequest = (AllocateRequest) Records.newRecord(AllocateRequest.class);
        ArrayList arrayList = new ArrayList(i);
        ArrayList arrayList2 = new ArrayList(i);
        for (int i3 = 0; i3 < i; i3++) {
            arrayList2.add(createResourceRequest("test-node-" + Integer.toString(i3), 6000, 2, i3 % 5, 1));
        }
        allocateRequest.setAskList(arrayList2);
        allocateRequest.setResponseId(this.lastResponseId);
        AllocateResponse allocate = this.interceptor.allocate(allocateRequest);
        Assert.assertNotNull("allocate() returned null response", allocate);
        checkAMRMToken(allocate.getAMRMToken());
        this.lastResponseId = allocate.getResponseId();
        arrayList.addAll(allocate.getAllocatedContainers());
        LOG.info("Number of allocated containers in the original request: " + Integer.toString(allocate.getAllocatedContainers().size()));
        int i4 = 0;
        while (arrayList.size() < i2) {
            int i5 = i4;
            i4++;
            if (i5 >= 10) {
                break;
            }
            AllocateRequest allocateRequest2 = (AllocateRequest) Records.newRecord(AllocateRequest.class);
            allocateRequest2.setResponseId(this.lastResponseId);
            AllocateResponse allocate2 = this.interceptor.allocate(allocateRequest2);
            Assert.assertNotNull("allocate() returned null response", allocate2);
            checkAMRMToken(allocate2.getAMRMToken());
            this.lastResponseId = allocate2.getResponseId();
            this.interceptor.drainAllAsyncQueue(false);
            arrayList.addAll(allocate2.getAllocatedContainers());
            LOG.info("Number of allocated containers in this request: " + Integer.toString(allocate2.getAllocatedContainers().size()));
            LOG.info("Total number of allocated containers: " + Integer.toString(arrayList.size()));
            Thread.sleep(10L);
        }
        Assert.assertEquals(i2, arrayList.size());
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void releaseContainersAndAssert(List<Container> list) throws Exception {
        Assert.assertTrue(list.size() > 0);
        AllocateRequest allocateRequest = (AllocateRequest) Records.newRecord(AllocateRequest.class);
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<Container> it = list.iterator();
        while (it.hasNext()) {
            arrayList.add(it.next().getId());
        }
        allocateRequest.setReleaseList(arrayList);
        allocateRequest.setResponseId(this.lastResponseId);
        AllocateResponse allocate = this.interceptor.allocate(allocateRequest);
        Assert.assertNotNull(allocate);
        checkAMRMToken(allocate.getAMRMToken());
        this.lastResponseId = allocate.getResponseId();
        ArrayList arrayList2 = new ArrayList();
        List<ContainerId> completedContainerIds = getCompletedContainerIds(allocate.getCompletedContainersStatuses());
        arrayList2.addAll(completedContainerIds);
        LOG.info("Number of containers received in the original request: " + Integer.toString(completedContainerIds.size()));
        int i = 0;
        while (arrayList2.size() < arrayList.size()) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            AllocateRequest allocateRequest2 = (AllocateRequest) Records.newRecord(AllocateRequest.class);
            allocateRequest2.setResponseId(this.lastResponseId);
            AllocateResponse allocate2 = this.interceptor.allocate(allocateRequest2);
            Assert.assertNotNull(allocate2);
            checkAMRMToken(allocate2.getAMRMToken());
            this.lastResponseId = allocate2.getResponseId();
            this.interceptor.drainAllAsyncQueue(false);
            List<ContainerId> completedContainerIds2 = getCompletedContainerIds(allocate2.getCompletedContainersStatuses());
            arrayList2.addAll(completedContainerIds2);
            LOG.info("Number of containers received in this request: " + Integer.toString(completedContainerIds2.size()));
            LOG.info("Total number of containers received: " + Integer.toString(arrayList2.size()));
            Thread.sleep(10L);
        }
        Assert.assertEquals(arrayList.size(), arrayList2.size());
    }

    private void checkAMRMToken(Token token) {
        if (token != null) {
            Assert.assertTrue(token.getKind().equals(Integer.toString(0)));
        }
    }

    @Test
    public void testMultipleSubClusters() throws Exception {
        this.interceptor.getUGIWithToken(this.interceptor.getAttemptId()).doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.hadoop.yarn.server.nodemanager.amrmproxy.TestFederationInterceptor.1
            @Override // java.security.PrivilegedExceptionAction
            public Object run() throws Exception {
                RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
                registerApplicationMasterRequest.setHost(Integer.toString(TestFederationInterceptor.this.testAppId));
                registerApplicationMasterRequest.setRpcPort(0);
                registerApplicationMasterRequest.setTrackingUrl("");
                Assert.assertNotNull(TestFederationInterceptor.this.interceptor.registerApplicationMaster(registerApplicationMasterRequest));
                TestFederationInterceptor.this.lastResponseId = 0;
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance("SC-1"));
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance("SC-2"));
                List containersAndAssert = TestFederationInterceptor.this.getContainersAndAssert(3, 3 * 2);
                Assert.assertEquals(2L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.deRegisterSubCluster(SubClusterId.newInstance("SC-2"));
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance("SC-3"));
                containersAndAssert.addAll(TestFederationInterceptor.this.getContainersAndAssert(1, 1 * 2));
                Assert.assertEquals(3L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.deRegisterSubCluster(SubClusterId.newInstance("SC-1"));
                TestFederationInterceptor.this.deRegisterSubCluster(SubClusterId.newInstance("SC-3"));
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance(TestFederationInterceptor.HOME_SC_ID));
                containersAndAssert.addAll(TestFederationInterceptor.this.getContainersAndAssert(2, 2 * 1));
                Assert.assertEquals(3L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.releaseContainersAndAssert(containersAndAssert);
                FinishApplicationMasterRequest finishApplicationMasterRequest = (FinishApplicationMasterRequest) Records.newRecord(FinishApplicationMasterRequest.class);
                finishApplicationMasterRequest.setDiagnostics("");
                finishApplicationMasterRequest.setTrackingUrl("");
                finishApplicationMasterRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
                FinishApplicationMasterResponse finishApplicationMaster = TestFederationInterceptor.this.interceptor.finishApplicationMaster(finishApplicationMasterRequest);
                Assert.assertNotNull(finishApplicationMaster);
                Assert.assertEquals(true, Boolean.valueOf(finishApplicationMaster.getIsUnregistered()));
                return null;
            }
        });
    }

    @Test
    public void testReregister() throws Exception {
        this.interceptor.getUGIWithToken(this.interceptor.getAttemptId()).doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.hadoop.yarn.server.nodemanager.amrmproxy.TestFederationInterceptor.2
            @Override // java.security.PrivilegedExceptionAction
            public Object run() throws Exception {
                RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
                registerApplicationMasterRequest.setHost(Integer.toString(TestFederationInterceptor.this.testAppId));
                registerApplicationMasterRequest.setRpcPort(0);
                registerApplicationMasterRequest.setTrackingUrl("");
                Assert.assertNotNull(TestFederationInterceptor.this.interceptor.registerApplicationMaster(registerApplicationMasterRequest));
                TestFederationInterceptor.this.lastResponseId = 0;
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance("SC-1"));
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance(TestFederationInterceptor.HOME_SC_ID));
                TestFederationInterceptor.this.interceptor.setShouldReRegisterNext();
                List containersAndAssert = TestFederationInterceptor.this.getContainersAndAssert(3, 3 * 2);
                Assert.assertEquals(1L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.interceptor.setShouldReRegisterNext();
                TestFederationInterceptor.this.releaseContainersAndAssert(containersAndAssert);
                TestFederationInterceptor.this.interceptor.setShouldReRegisterNext();
                FinishApplicationMasterRequest finishApplicationMasterRequest = (FinishApplicationMasterRequest) Records.newRecord(FinishApplicationMasterRequest.class);
                finishApplicationMasterRequest.setDiagnostics("");
                finishApplicationMasterRequest.setTrackingUrl("");
                finishApplicationMasterRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
                FinishApplicationMasterResponse finishApplicationMaster = TestFederationInterceptor.this.interceptor.finishApplicationMaster(finishApplicationMasterRequest);
                Assert.assertNotNull(finishApplicationMaster);
                Assert.assertEquals(true, Boolean.valueOf(finishApplicationMaster.getIsUnregistered()));
                return null;
            }
        });
    }

    @Test(timeout = 5000)
    public void testConcurrentRegister() throws InterruptedException, ExecutionException {
        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        ExecutorCompletionService executorCompletionService = new ExecutorCompletionService(newCachedThreadPool);
        Object registerSyncObj = MockResourceManagerFacade.getRegisterSyncObj();
        synchronized (registerSyncObj) {
            LOG.info("Starting first register thread");
            executorCompletionService.submit(new ConcurrentRegisterAMCallable());
            try {
                LOG.info("Test main starts waiting for the first thread to block");
                registerSyncObj.wait();
                LOG.info("Test main wait finished");
            } catch (Exception e) {
                LOG.info("Test main wait interrupted", e);
            }
        }
        LOG.info("Starting second register thread");
        executorCompletionService.submit(new ConcurrentRegisterAMCallable());
        LOG.info("Let first blocked register thread move on");
        synchronized (registerSyncObj) {
            registerSyncObj.notifyAll();
        }
        Assert.assertNotNull((RegisterApplicationMasterResponse) executorCompletionService.take().get());
        Assert.assertNotNull((RegisterApplicationMasterResponse) executorCompletionService.take().get());
        newCachedThreadPool.shutdown();
    }

    @Test
    public void testRecoverWithAMRMProxyHA() throws Exception {
        testRecover(this.registry);
    }

    @Test
    public void testRecoverWithoutAMRMProxyHA() throws Exception {
        testRecover(null);
    }

    protected void testRecover(final RegistryOperations registryOperations) throws Exception {
        this.interceptor.getUGIWithToken(this.interceptor.getAttemptId()).doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.hadoop.yarn.server.nodemanager.amrmproxy.TestFederationInterceptor.3
            @Override // java.security.PrivilegedExceptionAction
            public Object run() throws Exception {
                TestFederationInterceptor.this.interceptor = new TestableFederationInterceptor();
                TestFederationInterceptor.this.interceptor.init(new AMRMProxyApplicationContextImpl(TestFederationInterceptor.this.nmContext, TestFederationInterceptor.this.getConf(), TestFederationInterceptor.this.attemptId, "test-user", null, null, null, registryOperations));
                TestFederationInterceptor.this.interceptor.cleanupRegistry();
                RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
                registerApplicationMasterRequest.setHost(Integer.toString(TestFederationInterceptor.this.testAppId));
                registerApplicationMasterRequest.setRpcPort(TestFederationInterceptor.this.testAppId);
                registerApplicationMasterRequest.setTrackingUrl("");
                Assert.assertNotNull(TestFederationInterceptor.this.interceptor.registerApplicationMaster(registerApplicationMasterRequest));
                TestFederationInterceptor.this.lastResponseId = 0;
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance("SC-1"));
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance(TestFederationInterceptor.HOME_SC_ID));
                List containersAndAssert = TestFederationInterceptor.this.getContainersAndAssert(3, 3 * 2);
                Assert.assertEquals(1L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.interceptor.drainAllAsyncQueue(true);
                Map<String, byte[]> recoverDataMapForAppAttempt = TestFederationInterceptor.this.recoverDataMapForAppAttempt(TestFederationInterceptor.this.nmStateStore, TestFederationInterceptor.this.attemptId);
                if (registryOperations == null) {
                    Assert.assertTrue(recoverDataMapForAppAttempt.containsKey("FederationInterceptor/secondarySC/SC-1"));
                } else {
                    Assert.assertFalse(recoverDataMapForAppAttempt.containsKey("FederationInterceptor/secondarySC/SC-1"));
                }
                TestFederationInterceptor.this.interceptor = new TestableFederationInterceptor(TestFederationInterceptor.this.interceptor.getHomeRM(), TestFederationInterceptor.this.interceptor.getSecondaryRMs());
                TestFederationInterceptor.this.interceptor.init(new AMRMProxyApplicationContextImpl(TestFederationInterceptor.this.nmContext, TestFederationInterceptor.this.getConf(), TestFederationInterceptor.this.attemptId, "test-user", null, null, null, registryOperations));
                TestFederationInterceptor.this.interceptor.recover(recoverDataMapForAppAttempt);
                Assert.assertEquals(1L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                Assert.assertEquals(1L, TestFederationInterceptor.this.interceptor.getTimedOutSCs(true).size());
                try {
                    AllocateRequest allocateRequest = (AllocateRequest) Records.newRecord(AllocateRequest.class);
                    allocateRequest.setResponseId(TestFederationInterceptor.this.lastResponseId);
                    TestFederationInterceptor.this.lastResponseId = TestFederationInterceptor.this.interceptor.allocate(allocateRequest).getResponseId();
                    Assert.fail("Expecting an ApplicationMasterNotRegisteredException   after FederationInterceptor restarts and recovers");
                } catch (ApplicationMasterNotRegisteredException e) {
                }
                TestFederationInterceptor.this.interceptor.registerApplicationMaster(registerApplicationMasterRequest);
                TestFederationInterceptor.this.lastResponseId = 0;
                TestFederationInterceptor.this.releaseContainersAndAssert(containersAndAssert);
                FinishApplicationMasterRequest finishApplicationMasterRequest = (FinishApplicationMasterRequest) Records.newRecord(FinishApplicationMasterRequest.class);
                finishApplicationMasterRequest.setDiagnostics("");
                finishApplicationMasterRequest.setTrackingUrl("");
                finishApplicationMasterRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
                FinishApplicationMasterResponse finishApplicationMaster = TestFederationInterceptor.this.interceptor.finishApplicationMaster(finishApplicationMasterRequest);
                Assert.assertNotNull(finishApplicationMaster);
                Assert.assertEquals(true, Boolean.valueOf(finishApplicationMaster.getIsUnregistered()));
                if (registryOperations != null) {
                    Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getRegistryClient().getAllApplications().size());
                    return null;
                }
                Assert.assertFalse(TestFederationInterceptor.this.recoverDataMapForAppAttempt(TestFederationInterceptor.this.nmStateStore, TestFederationInterceptor.this.attemptId).containsKey("FederationInterceptor/secondarySC/SC-1"));
                return null;
            }
        });
    }

    @Test
    public void testRequestInterceptorChainCreation() throws Exception {
        RequestInterceptor createRequestInterceptorChain = super.getAMRMProxyService().createRequestInterceptorChain();
        int i = 0;
        while (createRequestInterceptorChain != null) {
            switch (i) {
                case 0:
                case 1:
                    Assert.assertEquals(PassThroughRequestInterceptor.class.getName(), createRequestInterceptorChain.getClass().getName());
                    break;
                case 2:
                    Assert.assertEquals(TestableFederationInterceptor.class.getName(), createRequestInterceptorChain.getClass().getName());
                    break;
                default:
                    Assert.fail();
                    break;
            }
            createRequestInterceptorChain = createRequestInterceptorChain.getNextInterceptor();
            i++;
        }
        Assert.assertEquals("The number of interceptors in chain does not match", Integer.toString(3), Integer.toString(i));
    }

    @Test
    public void testTwoIdenticalRegisterRequest() throws Exception {
        RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
        registerApplicationMasterRequest.setHost(Integer.toString(this.testAppId));
        registerApplicationMasterRequest.setRpcPort(0);
        registerApplicationMasterRequest.setTrackingUrl("");
        for (int i = 0; i < 2; i++) {
            Assert.assertNotNull(this.interceptor.registerApplicationMaster(registerApplicationMasterRequest));
            this.lastResponseId = 0;
        }
    }

    @Test
    public void testTwoDifferentRegisterRequest() throws Exception {
        RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
        registerApplicationMasterRequest.setHost(Integer.toString(this.testAppId));
        registerApplicationMasterRequest.setRpcPort(0);
        registerApplicationMasterRequest.setTrackingUrl("");
        Assert.assertNotNull(this.interceptor.registerApplicationMaster(registerApplicationMasterRequest));
        this.lastResponseId = 0;
        RegisterApplicationMasterRequest registerApplicationMasterRequest2 = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
        registerApplicationMasterRequest2.setHost(Integer.toString(this.testAppId));
        registerApplicationMasterRequest2.setRpcPort(0);
        registerApplicationMasterRequest2.setTrackingUrl("different");
        try {
            this.interceptor.registerApplicationMaster(registerApplicationMasterRequest2);
            this.lastResponseId = 0;
            Assert.fail("Should throw if a different request obj is used");
        } catch (YarnException e) {
        }
    }

    @Test
    public void testAllocateResponse() throws Exception {
        this.interceptor.registerApplicationMaster(RegisterApplicationMasterRequest.newInstance((String) null, 0, (String) null));
        AllocateRequest allocateRequest = (AllocateRequest) Records.newRecord(AllocateRequest.class);
        Map<SubClusterId, List<AllocateResponse>> asyncResponseSink = this.interceptor.getAsyncResponseSink();
        ContainerId newContainerId = ContainerId.newContainerId(this.attemptId, 0L);
        ContainerStatus containerStatus = (ContainerStatus) Records.newRecord(ContainerStatus.class);
        containerStatus.setContainerId(newContainerId);
        Container newInstance = Container.newInstance(newContainerId, (NodeId) null, (String) null, (Resource) null, (Priority) null, (Token) null);
        AllocateResponse allocateResponse = (AllocateResponse) Records.newRecord(AllocateResponse.class);
        allocateResponse.setAllocatedContainers(Collections.singletonList(newInstance));
        allocateResponse.setCompletedContainersStatuses(Collections.singletonList(containerStatus));
        allocateResponse.setUpdatedNodes(Collections.singletonList(Records.newRecord(NodeReport.class)));
        allocateResponse.setNMTokens(Collections.singletonList(Records.newRecord(NMToken.class)));
        allocateResponse.setUpdatedContainers(Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
        allocateResponse.setUpdateErrors(Collections.singletonList(Records.newRecord(UpdateContainerError.class)));
        allocateResponse.setAvailableResources((Resource) Records.newRecord(Resource.class));
        allocateResponse.setPreemptionMessage((PreemptionMessage) Records.newRecord(PreemptionMessage.class));
        ArrayList arrayList = new ArrayList();
        arrayList.add(allocateResponse);
        asyncResponseSink.put(SubClusterId.newInstance("SC-1"), arrayList);
        AllocateResponse allocate = this.interceptor.allocate(allocateRequest);
        Assert.assertEquals(1L, allocate.getAllocatedContainers().size());
        Assert.assertNotNull(allocate.getAvailableResources());
        Assert.assertEquals(1L, allocate.getCompletedContainersStatuses().size());
        Assert.assertEquals(1L, allocate.getUpdatedNodes().size());
        Assert.assertNotNull(allocate.getPreemptionMessage());
        Assert.assertEquals(1L, allocate.getNMTokens().size());
        Assert.assertEquals(1L, allocate.getUpdatedContainers().size());
        Assert.assertEquals(1L, allocate.getUpdateErrors().size());
    }

    @Test
    public void testSubClusterTimeOut() throws Exception {
        this.interceptor.getUGIWithToken(this.interceptor.getAttemptId()).doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.hadoop.yarn.server.nodemanager.amrmproxy.TestFederationInterceptor.4
            @Override // java.security.PrivilegedExceptionAction
            public Object run() throws Exception {
                RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
                registerApplicationMasterRequest.setHost(Integer.toString(TestFederationInterceptor.this.testAppId));
                registerApplicationMasterRequest.setRpcPort(0);
                registerApplicationMasterRequest.setTrackingUrl("");
                Assert.assertNotNull(TestFederationInterceptor.this.interceptor.registerApplicationMaster(registerApplicationMasterRequest));
                TestFederationInterceptor.this.lastResponseId = 0;
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance("SC-1"));
                TestFederationInterceptor.this.getContainersAndAssert(1, 1);
                Assert.assertEquals(2L, TestFederationInterceptor.this.interceptor.generateBaseAllocationResponse().getNumClusterNodes());
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getTimedOutSCs(true).size());
                Thread.sleep(800L);
                Assert.assertEquals(2L, TestFederationInterceptor.this.interceptor.generateBaseAllocationResponse().getNumClusterNodes());
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getTimedOutSCs(true).size());
                AllocateRequest allocateRequest = (AllocateRequest) Records.newRecord(AllocateRequest.class);
                allocateRequest.setResponseId(TestFederationInterceptor.this.lastResponseId - 1);
                TestFederationInterceptor.this.interceptor.allocate(allocateRequest);
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.generateBaseAllocationResponse().getNumClusterNodes());
                Assert.assertEquals(2L, TestFederationInterceptor.this.interceptor.getTimedOutSCs(true).size());
                return null;
            }
        });
    }

    @Test
    public void testSecondAttempt() throws Exception {
        final RegisterApplicationMasterRequest registerApplicationMasterRequest = (RegisterApplicationMasterRequest) Records.newRecord(RegisterApplicationMasterRequest.class);
        registerApplicationMasterRequest.setHost(Integer.toString(this.testAppId));
        registerApplicationMasterRequest.setRpcPort(this.testAppId);
        registerApplicationMasterRequest.setTrackingUrl("");
        this.interceptor.getUGIWithToken(this.interceptor.getAttemptId()).doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.hadoop.yarn.server.nodemanager.amrmproxy.TestFederationInterceptor.5
            @Override // java.security.PrivilegedExceptionAction
            public Object run() throws Exception {
                Assert.assertNotNull(TestFederationInterceptor.this.interceptor.registerApplicationMaster(registerApplicationMasterRequest));
                TestFederationInterceptor.this.lastResponseId = 0;
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance("SC-1"));
                TestFederationInterceptor.this.registerSubCluster(SubClusterId.newInstance(TestFederationInterceptor.HOME_SC_ID));
                Iterator it = TestFederationInterceptor.this.getContainersAndAssert(3, 3 * 2).iterator();
                while (it.hasNext()) {
                    TestFederationInterceptor.LOG.info("Allocated container " + ((Container) it.next()).getId());
                }
                Assert.assertEquals(1L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                TestFederationInterceptor.this.interceptor.drainAllAsyncQueue(true);
                ConcurrentHashMap<String, MockResourceManagerFacade> secondaryRMs = TestFederationInterceptor.this.interceptor.getSecondaryRMs();
                TestFederationInterceptor.this.attemptId = ApplicationAttemptId.newInstance(TestFederationInterceptor.this.attemptId.getApplicationId(), TestFederationInterceptor.this.attemptId.getAttemptId() + 1);
                TestFederationInterceptor.this.interceptor = new TestableFederationInterceptor(null, secondaryRMs);
                TestFederationInterceptor.this.interceptor.init(new AMRMProxyApplicationContextImpl(TestFederationInterceptor.this.nmContext, TestFederationInterceptor.this.getConf(), TestFederationInterceptor.this.attemptId, "test-user", null, null, null, TestFederationInterceptor.this.registry));
                return null;
            }
        });
        this.interceptor.getUGIWithToken(this.interceptor.getAttemptId()).doAs(new PrivilegedExceptionAction<Object>() { // from class: org.apache.hadoop.yarn.server.nodemanager.amrmproxy.TestFederationInterceptor.6
            @Override // java.security.PrivilegedExceptionAction
            public Object run() throws Exception {
                RegisterApplicationMasterResponse registerApplicationMaster = TestFederationInterceptor.this.interceptor.registerApplicationMaster(registerApplicationMasterRequest);
                TestFederationInterceptor.this.lastResponseId = 0;
                Assert.assertEquals(1L, TestFederationInterceptor.this.interceptor.getUnmanagedAMPoolSize());
                Assert.assertEquals(1L, TestFederationInterceptor.this.interceptor.getTimedOutSCs(true).size());
                Assert.assertEquals(3, registerApplicationMaster.getContainersFromPreviousAttempts().size());
                TestFederationInterceptor.this.releaseContainersAndAssert(registerApplicationMaster.getContainersFromPreviousAttempts());
                FinishApplicationMasterRequest finishApplicationMasterRequest = (FinishApplicationMasterRequest) Records.newRecord(FinishApplicationMasterRequest.class);
                finishApplicationMasterRequest.setDiagnostics("");
                finishApplicationMasterRequest.setTrackingUrl("");
                finishApplicationMasterRequest.setFinalApplicationStatus(FinalApplicationStatus.SUCCEEDED);
                FinishApplicationMasterResponse finishApplicationMaster = TestFederationInterceptor.this.interceptor.finishApplicationMaster(finishApplicationMasterRequest);
                Assert.assertNotNull(finishApplicationMaster);
                Assert.assertEquals(true, Boolean.valueOf(finishApplicationMaster.getIsUnregistered()));
                if (TestFederationInterceptor.this.interceptor.getRegistryClient() == null) {
                    return null;
                }
                Assert.assertEquals(0L, TestFederationInterceptor.this.interceptor.getRegistryClient().getAllApplications().size());
                return null;
            }
        });
    }

    @Test
    public void testMergeAllocateResponse() {
        ContainerId newContainerId = ContainerId.newContainerId(this.attemptId, 0L);
        ContainerStatus containerStatus = (ContainerStatus) Records.newRecord(ContainerStatus.class);
        containerStatus.setContainerId(newContainerId);
        Container newInstance = Container.newInstance(newContainerId, (NodeId) null, (String) null, (Resource) null, (Priority) null, (Token) null);
        AllocateResponse allocateResponse = (AllocateResponse) Records.newRecord(AllocateResponse.class);
        allocateResponse.setAllocatedContainers(Collections.singletonList(newInstance));
        allocateResponse.setCompletedContainersStatuses(Collections.singletonList(containerStatus));
        allocateResponse.setUpdatedNodes(Collections.singletonList(Records.newRecord(NodeReport.class)));
        allocateResponse.setNMTokens(Collections.singletonList(Records.newRecord(NMToken.class)));
        allocateResponse.setUpdatedContainers(Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
        allocateResponse.setUpdateErrors(Collections.singletonList(Records.newRecord(UpdateContainerError.class)));
        allocateResponse.setAvailableResources((Resource) Records.newRecord(Resource.class));
        allocateResponse.setPreemptionMessage(createDummyPreemptionMessage(ContainerId.newContainerId(this.attemptId, 0L)));
        AllocateResponse allocateResponse2 = (AllocateResponse) Records.newRecord(AllocateResponse.class);
        allocateResponse2.setAllocatedContainers(Collections.singletonList(newInstance));
        allocateResponse2.setCompletedContainersStatuses(Collections.singletonList(containerStatus));
        allocateResponse2.setUpdatedNodes(Collections.singletonList(Records.newRecord(NodeReport.class)));
        allocateResponse2.setNMTokens(Collections.singletonList(Records.newRecord(NMToken.class)));
        allocateResponse2.setUpdatedContainers(Collections.singletonList(Records.newRecord(UpdatedContainer.class)));
        allocateResponse2.setUpdateErrors(Collections.singletonList(Records.newRecord(UpdateContainerError.class)));
        allocateResponse2.setAvailableResources((Resource) Records.newRecord(Resource.class));
        allocateResponse2.setPreemptionMessage(createDummyPreemptionMessage(ContainerId.newContainerId(this.attemptId, 1L)));
        this.interceptor.mergeAllocateResponse(allocateResponse, allocateResponse2, SubClusterId.newInstance("SC-1"));
        Assert.assertEquals(2L, allocateResponse.getPreemptionMessage().getContract().getContainers().size());
        Assert.assertEquals(2L, allocateResponse.getAllocatedContainers().size());
        Assert.assertEquals(2L, allocateResponse.getUpdatedNodes().size());
        Assert.assertEquals(2L, allocateResponse.getCompletedContainersStatuses().size());
    }

    private PreemptionMessage createDummyPreemptionMessage(ContainerId containerId) {
        PreemptionMessage preemptionMessage = (PreemptionMessage) Records.newRecord(PreemptionMessage.class);
        PreemptionContainer preemptionContainer = (PreemptionContainer) Records.newRecord(PreemptionContainer.class);
        preemptionContainer.setId(containerId);
        HashSet hashSet = new HashSet();
        hashSet.add(preemptionContainer);
        PreemptionContract preemptionContract = (PreemptionContract) Records.newRecord(PreemptionContract.class);
        preemptionContract.setContainers(hashSet);
        preemptionMessage.setContract(preemptionContract);
        return preemptionMessage;
    }
}
