/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.resourcemanager;

import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.DefaultJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
import org.apache.flink.runtime.resourcemanager.TaskManagerInfoWithSlots;
import org.apache.flink.runtime.resourcemanager.TestingJobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManagerBuilder;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManager;
import org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.RecipientUnreachableException;
import org.apache.flink.runtime.slots.ResourceRequirement;
import org.apache.flink.runtime.slots.ResourceRequirements;
import org.apache.flink.runtime.taskexecutor.TaskExecutorMemoryConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutorThreadInfoGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.testutils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.ThrowingConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

public class ResourceManagerTest
extends TestLogger {
    private static final Time TIMEOUT = Time.minutes((long)2L);
    private static final HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
    private static final HeartbeatServices fastHeartbeatServices = new HeartbeatServices(1L, 1L);
    private static final HeartbeatServices failedRpcEnabledHeartbeatServices = new HeartbeatServices(1L, 10000000L, 1);
    private static final HardwareDescription hardwareDescription = new HardwareDescription(42, 1337L, 1337L, 0L);
    private static final int dataPort = 1234;
    private static final int jmxPort = 23456;
    private static TestingRpcService rpcService;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceID resourceManagerResourceId;
    private TestingResourceManager resourceManager;
    private ResourceManagerId resourceManagerId;

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.highAvailabilityServices.setResourceManagerLeaderElectionService(new TestingLeaderElectionService());
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManagerResourceId = ResourceID.generate();
    }

    @After
    public void after() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint)this.resourceManager, (Time)TIMEOUT);
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
        }
        if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcServices((Time)TIMEOUT, (RpcService[])new RpcService[]{rpcService});
        }
    }

    @Test
    public void testRequestTaskManagerInfo() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.resourceManager = this.createAndStartResourceManager(heartbeatServices);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        this.registerTaskExecutor(resourceManagerGateway, taskManagerId, taskExecutorGateway.getAddress());
        CompletableFuture taskManagerInfoFuture = resourceManagerGateway.requestTaskManagerDetailsInfo(taskManagerId, TestingUtils.TIMEOUT);
        TaskManagerInfoWithSlots taskManagerInfoWithSlots = (TaskManagerInfoWithSlots)taskManagerInfoFuture.get();
        TaskManagerInfo taskManagerInfo = taskManagerInfoWithSlots.getTaskManagerInfo();
        Assert.assertEquals((Object)taskManagerId, (Object)taskManagerInfo.getResourceId());
        Assert.assertEquals((Object)hardwareDescription, (Object)taskManagerInfo.getHardwareDescription());
        Assert.assertEquals((Object)taskExecutorGateway.getAddress(), (Object)taskManagerInfo.getAddress());
        Assert.assertEquals((long)1234L, (long)taskManagerInfo.getDataPort());
        Assert.assertEquals((long)23456L, (long)taskManagerInfo.getJmxPort());
        Assert.assertEquals((long)0L, (long)taskManagerInfo.getNumberSlots());
        Assert.assertEquals((long)0L, (long)taskManagerInfo.getNumberAvailableSlots());
        Assert.assertThat((Object)taskManagerInfoWithSlots.getAllocatedSlots(), (Matcher)Matchers.is((Matcher)Matchers.empty()));
    }

    @Test
    public void testRequestTaskExecutorGateway() throws Exception {
        ResourceID taskManagerId = ResourceID.generate();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.resourceManager = this.createAndStartResourceManager(heartbeatServices);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        this.registerTaskExecutor(resourceManagerGateway, taskManagerId, taskExecutorGateway.getAddress());
        CompletableFuture taskExecutorGatewayFuture = resourceManagerGateway.requestTaskExecutorThreadInfoGateway(taskManagerId, TestingUtils.TIMEOUT);
        TaskExecutorThreadInfoGateway taskExecutorGatewayResult = (TaskExecutorThreadInfoGateway)taskExecutorGatewayFuture.get();
        Assert.assertEquals((Object)taskExecutorGateway, (Object)taskExecutorGatewayResult);
    }

    private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID taskExecutorId, String taskExecutorAddress) throws Exception {
        TaskExecutorRegistration taskExecutorRegistration = new TaskExecutorRegistration(taskExecutorAddress, taskExecutorId, 1234, 23456, hardwareDescription, new TaskExecutorMemoryConfiguration(Long.valueOf(1L), Long.valueOf(2L), Long.valueOf(3L), Long.valueOf(4L), Long.valueOf(5L), Long.valueOf(6L), Long.valueOf(7L), Long.valueOf(8L), Long.valueOf(9L), Long.valueOf(10L)), ResourceProfile.ZERO, ResourceProfile.ZERO);
        CompletableFuture registrationFuture = resourceManagerGateway.registerTaskExecutor(taskExecutorRegistration, TestingUtils.TIMEOUT);
        Assert.assertThat(registrationFuture.get(), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
    }

    @Test
    public void testDisconnectJobManagerClearsRequirements() throws Exception {
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        TestingJobLeaderIdService jobLeaderIdService = TestingJobLeaderIdService.newBuilder().setGetLeaderIdFunction(jobId -> CompletableFuture.completedFuture(jobMasterGateway.getFencingToken())).build();
        CompletableFuture clearRequirementsFuture = new CompletableFuture();
        TestingSlotManager slotManager = new TestingSlotManagerBuilder().setClearRequirementsConsumer(clearRequirementsFuture::complete).createSlotManager();
        this.resourceManager = this.createAndStartResourceManager(heartbeatServices, jobLeaderIdService, slotManager);
        JobID jobId2 = JobID.generate();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        resourceManagerGateway.registerJobManager(jobMasterGateway.getFencingToken(), ResourceID.generate(), jobMasterGateway.getAddress(), jobId2, TIMEOUT).get();
        resourceManagerGateway.declareRequiredResources(jobMasterGateway.getFencingToken(), ResourceRequirements.create((JobID)jobId2, (String)jobMasterGateway.getAddress(), Collections.singleton(ResourceRequirement.create((ResourceProfile)ResourceProfile.UNKNOWN, (int)1))), TIMEOUT).get();
        resourceManagerGateway.disconnectJobManager(jobId2, JobStatus.FINISHED, (Exception)new FlinkException("Test exception"));
        Assert.assertThat(clearRequirementsFuture.get(5L, TimeUnit.SECONDS), (Matcher)Matchers.is((Object)jobId2));
    }

    @Test
    public void testHeartbeatTimeoutWithJobMaster() throws Exception {
        CompletableFuture heartbeatRequestFuture = new CompletableFuture();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setResourceManagerHeartbeatFunction(resourceId -> {
            heartbeatRequestFuture.complete(resourceId);
            return FutureUtils.completedVoidFuture();
        }).setDisconnectResourceManagerConsumer(disconnectFuture::complete).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        JobID jobId = new JobID();
        ResourceID jobMasterResourceId = ResourceID.generate();
        SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(requestedJobId -> {
            Assert.assertThat((Object)requestedJobId, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)jobId)));
            return jobMasterLeaderRetrievalService;
        });
        this.runHeartbeatTimeoutTest((ThrowingConsumer<ResourceManagerGateway, Exception>)((ThrowingConsumer)resourceManagerGateway -> {
            CompletableFuture registrationFuture = resourceManagerGateway.registerJobManager(jobMasterGateway.getFencingToken(), jobMasterResourceId, jobMasterGateway.getAddress(), jobId, TIMEOUT);
            Assert.assertThat(registrationFuture.get(), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
        }), (ThrowingConsumer<ResourceID, Exception>)((ThrowingConsumer)resourceManagerResourceId -> {
            ResourceID optionalHeartbeatRequestOrigin = heartbeatRequestFuture.getNow(null);
            Assert.assertThat((Object)optionalHeartbeatRequestOrigin, (Matcher)Matchers.anyOf((Matcher)Matchers.is((Object)resourceManagerResourceId), (Matcher)Matchers.is((Matcher)Matchers.nullValue())));
            Assert.assertThat(disconnectFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.resourceManagerId)));
        }));
    }

    @Test
    public void testJobMasterBecomesUnreachableTriggersDisconnect() throws Exception {
        JobID jobId = new JobID();
        ResourceID jobMasterResourceId = ResourceID.generate();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).setResourceManagerHeartbeatFunction(resourceId -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"))).setDisconnectResourceManagerConsumer(disconnectFuture::complete).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        SettableLeaderRetrievalService jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(requestedJobId -> {
            Assert.assertThat((Object)requestedJobId, (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)jobId)));
            return jobMasterLeaderRetrievalService;
        });
        this.runHeartbeatTargetBecomesUnreachableTest((ThrowingConsumer<ResourceManagerGateway, Exception>)((ThrowingConsumer)resourceManagerGateway -> {
            CompletableFuture registrationFuture = resourceManagerGateway.registerJobManager(jobMasterGateway.getFencingToken(), jobMasterResourceId, jobMasterGateway.getAddress(), jobId, TIMEOUT);
            Assert.assertThat(registrationFuture.get(), (Matcher)Matchers.instanceOf(RegistrationResponse.Success.class));
        }), (ThrowingConsumer<ResourceID, Exception>)((ThrowingConsumer)resourceManagerResourceId -> Assert.assertThat(disconnectFuture.get(), (Matcher)Matchers.is((Matcher)Matchers.equalTo((Object)this.resourceManagerId)))));
    }

    @Test
    public void testHeartbeatTimeoutWithTaskExecutor() throws Exception {
        ResourceID taskExecutorId = ResourceID.generate();
        CompletableFuture heartbeatRequestFuture = new CompletableFuture();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setDisconnectResourceManagerConsumer(disconnectFuture::complete).setHeartbeatResourceManagerFunction(resourceId -> {
            heartbeatRequestFuture.complete(resourceId);
            return FutureUtils.completedVoidFuture();
        }).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.runHeartbeatTimeoutTest((ThrowingConsumer<ResourceManagerGateway, Exception>)((ThrowingConsumer)resourceManagerGateway -> this.registerTaskExecutor((ResourceManagerGateway)resourceManagerGateway, taskExecutorId, taskExecutorGateway.getAddress())), (ThrowingConsumer<ResourceID, Exception>)((ThrowingConsumer)resourceManagerResourceId -> {
            ResourceID optionalHeartbeatRequestOrigin = heartbeatRequestFuture.getNow(null);
            Assert.assertThat((Object)optionalHeartbeatRequestOrigin, (Matcher)Matchers.anyOf((Matcher)Matchers.is((Object)resourceManagerResourceId), (Matcher)Matchers.is((Matcher)Matchers.nullValue())));
            Assert.assertThat(disconnectFuture.get(), (Matcher)Matchers.instanceOf(TimeoutException.class));
        }));
    }

    @Test
    public void testTaskExecutorBecomesUnreachableTriggersDisconnect() throws Exception {
        ResourceID taskExecutorId = ResourceID.generate();
        CompletableFuture disconnectFuture = new CompletableFuture();
        TestingTaskExecutorGateway taskExecutorGateway = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).setDisconnectResourceManagerConsumer(disconnectFuture::complete).setHeartbeatResourceManagerFunction(resourceId -> FutureUtils.completedExceptionally((Throwable)new RecipientUnreachableException("sender", "recipient", "task executor is unreachable"))).createTestingTaskExecutorGateway();
        rpcService.registerGateway(taskExecutorGateway.getAddress(), (RpcGateway)taskExecutorGateway);
        this.runHeartbeatTargetBecomesUnreachableTest((ThrowingConsumer<ResourceManagerGateway, Exception>)((ThrowingConsumer)resourceManagerGateway -> this.registerTaskExecutor((ResourceManagerGateway)resourceManagerGateway, taskExecutorId, taskExecutorGateway.getAddress())), (ThrowingConsumer<ResourceID, Exception>)((ThrowingConsumer)resourceManagerResourceId -> Assert.assertThat(disconnectFuture.get(), (Matcher)Matchers.instanceOf(ResourceManagerException.class))));
    }

    @Test
    public void testDisconnectJobManagerWithTerminalStatusShouldRemoveJob() throws Exception {
        this.testDisconnectJobManager(JobStatus.CANCELED);
    }

    @Test
    public void testDisconnectJobManagerWithNonTerminalStatusShouldNotRemoveJob() throws Exception {
        this.testDisconnectJobManager(JobStatus.FAILING);
    }

    private void testDisconnectJobManager(JobStatus jobStatus) throws Exception {
        TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder().setAddress(UUID.randomUUID().toString()).build();
        rpcService.registerGateway(jobMasterGateway.getAddress(), (RpcGateway)jobMasterGateway);
        OneShotLatch jobAdded = new OneShotLatch();
        OneShotLatch jobRemoved = new OneShotLatch();
        TestingJobLeaderIdService jobLeaderIdService = TestingJobLeaderIdService.newBuilder().setAddJobConsumer(ignored -> jobAdded.trigger()).setRemoveJobConsumer(ignored -> jobRemoved.trigger()).build();
        this.resourceManager = this.createAndStartResourceManager(heartbeatServices, jobLeaderIdService);
        this.highAvailabilityServices.setJobMasterLeaderRetrieverFunction(requestedJobId -> new SettableLeaderRetrievalService(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID()));
        JobID jobId = JobID.generate();
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        resourceManagerGateway.registerJobManager(jobMasterGateway.getFencingToken(), ResourceID.generate(), jobMasterGateway.getAddress(), jobId, TIMEOUT);
        jobAdded.await();
        resourceManagerGateway.disconnectJobManager(jobId, jobStatus, (Exception)new FlinkException("Test exception"));
        if (jobStatus.isGloballyTerminalState()) {
            jobRemoved.await();
        } else {
            try {
                jobRemoved.await(10L, TimeUnit.MILLISECONDS);
                Assert.fail((String)"We should not have removed the job.");
            }
            catch (TimeoutException timeoutException) {
                // empty catch block
            }
        }
    }

    private void runHeartbeatTimeoutTest(ThrowingConsumer<ResourceManagerGateway, Exception> registerComponentAtResourceManager, ThrowingConsumer<ResourceID, Exception> verifyHeartbeatTimeout) throws Exception {
        this.resourceManager = this.createAndStartResourceManager(fastHeartbeatServices);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerComponentAtResourceManager.accept((Object)resourceManagerGateway);
        verifyHeartbeatTimeout.accept((Object)this.resourceManagerResourceId);
    }

    private void runHeartbeatTargetBecomesUnreachableTest(ThrowingConsumer<ResourceManagerGateway, Exception> registerComponentAtResourceManager, ThrowingConsumer<ResourceID, Exception> verifyHeartbeatTimeout) throws Exception {
        this.resourceManager = this.createAndStartResourceManager(failedRpcEnabledHeartbeatServices);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway)this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
        registerComponentAtResourceManager.accept((Object)resourceManagerGateway);
        verifyHeartbeatTimeout.accept((Object)this.resourceManagerResourceId);
    }

    private TestingResourceManager createAndStartResourceManager(HeartbeatServices heartbeatServices) throws Exception {
        DefaultJobLeaderIdService jobLeaderIdService = new DefaultJobLeaderIdService((HighAvailabilityServices)this.highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime());
        return this.createAndStartResourceManager(heartbeatServices, (JobLeaderIdService)jobLeaderIdService);
    }

    private TestingResourceManager createAndStartResourceManager(HeartbeatServices heartbeatServices, JobLeaderIdService jobLeaderIdService) throws Exception {
        DeclarativeSlotManager slotManager = DeclarativeSlotManagerBuilder.newBuilder().setScheduledExecutor(rpcService.getScheduledExecutor()).build();
        return this.createAndStartResourceManager(heartbeatServices, jobLeaderIdService, (SlotManager)slotManager);
    }

    private TestingResourceManager createAndStartResourceManager(HeartbeatServices heartbeatServices, JobLeaderIdService jobLeaderIdService, SlotManager slotManager) throws Exception {
        this.resourceManagerId = ResourceManagerId.generate();
        TestingResourceManager resourceManager = new TestingResourceManager(rpcService, this.resourceManagerId.toUUID(), this.resourceManagerResourceId, heartbeatServices, slotManager, NoOpResourceManagerPartitionTracker::get, jobLeaderIdService, this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
        resourceManager.start();
        resourceManager.getStartedFuture().get(TIMEOUT.getSize(), TIMEOUT.getUnit());
        return resourceManager;
    }
}

