package org.apache.flink.runtime.taskexecutor;

import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.CompletableFuture;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.filecache.FileCache;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.TestingSerialRpcService;
import org.apache.flink.runtime.state.TaskStateHandles;
import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest.class */
public class TaskExecutorTest extends TestLogger {

    @Rule
    public TestName name = new TestName();

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$TestInvokable.class */
    public static class TestInvokable extends AbstractInvokable {
        static final CompletableFuture<Boolean> completableFuture = new FlinkCompletableFuture();

        public void invoke() throws Exception {
            completableFuture.complete(true);
        }
    }

    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        JobID jobID = new JobID();
        TaskManagerConfiguration fromConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
        final TaskManagerLocation taskManagerLocation = new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLoopbackAddress(), 1234);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class)), (TimerService) Mockito.mock(TimerService.class));
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        TestingLeaderRetrievalService testingLeaderRetrievalService2 = new TestingLeaderRetrievalService(null, null);
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, testingLeaderRetrievalService2);
        testingHighAvailabilityServices.setResourceManagerLeaderRetriever(testingLeaderRetrievalService);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        HeartbeatServices heartbeatServices = (HeartbeatServices) Mockito.mock(HeartbeatServices.class);
        Mockito.when(heartbeatServices.createHeartbeatManager((ResourceID) Mockito.eq(taskManagerLocation.getResourceID()), (HeartbeatListener) Mockito.any(HeartbeatListener.class), (ScheduledExecutor) Mockito.any(ScheduledExecutor.class), (Logger) Mockito.any(Logger.class))).thenAnswer(new Answer<HeartbeatManagerImpl<Void, Void>>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutorTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public HeartbeatManagerImpl<Void, Void> m325answer(InvocationOnMock invocationOnMock) throws Throwable {
                return new HeartbeatManagerImpl<>(10L, taskManagerLocation.getResourceID(), (HeartbeatListener) invocationOnMock.getArguments()[1], (Executor) invocationOnMock.getArguments()[2], (ScheduledExecutor) invocationOnMock.getArguments()[2], (Logger) invocationOnMock.getArguments()[3]);
            }
        });
        UUID randomUUID = UUID.randomUUID();
        ResourceID resourceID = new ResourceID("jm");
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.registerTaskManager((String) Mockito.any(String.class), (TaskManagerLocation) Mockito.eq(taskManagerLocation), (UUID) Mockito.eq(randomUUID), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(resourceID, 42)));
        Mockito.when(jobMasterGateway.getAddress()).thenReturn("jm");
        Mockito.when(jobMasterGateway.getHostname()).thenReturn("localhost");
        try {
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, fromConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), testingHighAvailabilityServices, heartbeatServices, (MetricRegistry) Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, new JobManagerTable(), jobLeaderService, testingFatalErrorHandler);
            taskExecutor.start();
            testingSerialRpcService.registerGateway("jm", jobMasterGateway);
            jobLeaderService.addJob(jobID, "jm");
            testingLeaderRetrievalService2.notifyListener("jm", randomUUID);
            ((JobMasterGateway) Mockito.verify(jobMasterGateway)).registerTaskManager((String) Mockito.eq(taskExecutor.getAddress()), (TaskManagerLocation) Mockito.eq(taskManagerLocation), (UUID) Mockito.eq(randomUUID), (Time) Mockito.any(Time.class));
            ((JobMasterGateway) Mockito.verify(jobMasterGateway, Mockito.timeout(500L))).disconnectTaskManager((ResourceID) Mockito.eq(taskManagerLocation.getResourceID()), (Exception) Mockito.any(TimeoutException.class));
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        ResourceID resourceID = new ResourceID("rm");
        ResourceID resourceID2 = new ResourceID("tm");
        UUID randomUUID = UUID.randomUUID();
        RpcGateway rpcGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        Mockito.when(rpcGateway.registerTaskExecutor((UUID) Mockito.any(UUID.class), Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, 10L)));
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        testingSerialRpcService.registerGateway("rm", rpcGateway);
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setResourceManagerLeaderRetriever(testingLeaderRetrievalService);
        TaskManagerConfiguration taskManagerConfiguration = (TaskManagerConfiguration) Mockito.mock(TaskManagerConfiguration.class);
        Mockito.when(Integer.valueOf(taskManagerConfiguration.getNumberSlots())).thenReturn(1);
        final TaskManagerLocation taskManagerLocation = (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class);
        Mockito.when(taskManagerLocation.getResourceID()).thenReturn(resourceID2);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(new SlotReport());
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        HeartbeatServices heartbeatServices = (HeartbeatServices) Mockito.mock(HeartbeatServices.class);
        Mockito.when(heartbeatServices.createHeartbeatManager((ResourceID) Mockito.eq(taskManagerLocation.getResourceID()), (HeartbeatListener) Mockito.any(HeartbeatListener.class), (ScheduledExecutor) Mockito.any(ScheduledExecutor.class), (Logger) Mockito.any(Logger.class))).thenAnswer(new Answer<HeartbeatManagerImpl<Void, Void>>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutorTest.2
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public HeartbeatManagerImpl<Void, Void> m326answer(InvocationOnMock invocationOnMock) throws Throwable {
                return new HeartbeatManagerImpl<>(10L, taskManagerLocation.getResourceID(), (HeartbeatListener) invocationOnMock.getArguments()[1], (Executor) invocationOnMock.getArguments()[2], (ScheduledExecutor) invocationOnMock.getArguments()[2], (Logger) invocationOnMock.getArguments()[3]);
            }
        });
        try {
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, taskManagerConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), testingHighAvailabilityServices, heartbeatServices, (MetricRegistry) Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable) Mockito.mock(JobManagerTable.class), (JobLeaderService) Mockito.mock(JobLeaderService.class), testingFatalErrorHandler);
            taskExecutor.start();
            testingLeaderRetrievalService.notifyListener("rm", randomUUID);
            ((ResourceManagerGateway) Mockito.verify(rpcGateway, Mockito.atLeast(1))).registerTaskExecutor((UUID) Mockito.eq(randomUUID), (String) Mockito.eq(taskExecutor.getAddress()), (ResourceID) Mockito.eq(resourceID2), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class));
            ((ResourceManagerGateway) Mockito.verify(rpcGateway, Mockito.timeout(500L))).disconnectTaskManager((ResourceID) Mockito.eq(taskManagerLocation.getResourceID()), (Exception) Mockito.any(TimeoutException.class));
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
        ResourceID generate = ResourceID.generate();
        ResourceID resourceID = new ResourceID("/resource/manager/address/one");
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        try {
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
            Mockito.when(resourceManagerGateway.registerTaskExecutor((UUID) Mockito.any(UUID.class), Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, 10L)));
            TaskManagerConfiguration taskManagerConfiguration = (TaskManagerConfiguration) Mockito.mock(TaskManagerConfiguration.class);
            Mockito.when(Integer.valueOf(taskManagerConfiguration.getNumberSlots())).thenReturn(1);
            testingSerialRpcService.registerGateway("/resource/manager/address/one", resourceManagerGateway);
            TaskManagerLocation taskManagerLocation = (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class);
            Mockito.when(taskManagerLocation.getResourceID()).thenReturn(generate);
            StandaloneHaServices standaloneHaServices = new StandaloneHaServices("/resource/manager/address/one", "localhost");
            TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
            SlotReport slotReport = new SlotReport();
            Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(slotReport);
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, taskManagerConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), standaloneHaServices, (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS), (MetricRegistry) Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable) Mockito.mock(JobManagerTable.class), (JobLeaderService) Mockito.mock(JobLeaderService.class), testingFatalErrorHandler);
            taskExecutor.start();
            ((ResourceManagerGateway) Mockito.verify(resourceManagerGateway)).registerTaskExecutor((UUID) Mockito.any(UUID.class), (String) Mockito.eq(taskExecutor.getAddress()), (ResourceID) Mockito.eq(generate), (SlotReport) Mockito.eq(slotReport), (Time) Mockito.any(Time.class));
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    public void testTriggerRegistrationOnLeaderChange() throws Exception {
        ResourceID generate = ResourceID.generate();
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        ResourceID resourceID = new ResourceID("/resource/manager/address/one");
        ResourceID resourceID2 = new ResourceID("/resource/manager/address/two");
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        try {
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
            ResourceManagerGateway resourceManagerGateway2 = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
            Mockito.when(resourceManagerGateway.registerTaskExecutor((UUID) Mockito.any(UUID.class), Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, 10L)));
            Mockito.when(resourceManagerGateway2.registerTaskExecutor((UUID) Mockito.any(UUID.class), Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID2, 10L)));
            testingSerialRpcService.registerGateway("/resource/manager/address/one", resourceManagerGateway);
            testingSerialRpcService.registerGateway("/resource/manager/address/two", resourceManagerGateway2);
            TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
            TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
            testingHighAvailabilityServices.setResourceManagerLeaderRetriever(testingLeaderRetrievalService);
            TaskManagerConfiguration taskManagerConfiguration = (TaskManagerConfiguration) Mockito.mock(TaskManagerConfiguration.class);
            Mockito.when(Integer.valueOf(taskManagerConfiguration.getNumberSlots())).thenReturn(1);
            Mockito.when(taskManagerConfiguration.getConfiguration()).thenReturn(new Configuration());
            Mockito.when(taskManagerConfiguration.getTmpDirectories()).thenReturn(new String[1]);
            TaskManagerLocation taskManagerLocation = (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class);
            Mockito.when(taskManagerLocation.getResourceID()).thenReturn(generate);
            Mockito.when(taskManagerLocation.getHostname()).thenReturn("foobar");
            TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
            SlotReport slotReport = new SlotReport();
            Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(slotReport);
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, taskManagerConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), testingHighAvailabilityServices, (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS), (MetricRegistry) Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, (JobManagerTable) Mockito.mock(JobManagerTable.class), (JobLeaderService) Mockito.mock(JobLeaderService.class), testingFatalErrorHandler);
            taskExecutor.start();
            String address = taskExecutor.getAddress();
            Assert.assertNull(taskExecutor.getResourceManagerConnection());
            testingLeaderRetrievalService.notifyListener("/resource/manager/address/one", randomUUID);
            ((ResourceManagerGateway) Mockito.verify(resourceManagerGateway)).registerTaskExecutor((UUID) Mockito.eq(randomUUID), (String) Mockito.eq(address), (ResourceID) Mockito.eq(generate), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class));
            Assert.assertNotNull(taskExecutor.getResourceManagerConnection());
            testingLeaderRetrievalService.notifyListener(null, null);
            testingLeaderRetrievalService.notifyListener("/resource/manager/address/two", randomUUID2);
            ((ResourceManagerGateway) Mockito.verify(resourceManagerGateway2)).registerTaskExecutor((UUID) Mockito.eq(randomUUID2), (String) Mockito.eq(address), (ResourceID) Mockito.eq(generate), (SlotReport) Mockito.eq(slotReport), (Time) Mockito.any(Time.class));
            Assert.assertNotNull(taskExecutor.getResourceManagerConnection());
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test(timeout = 1000)
    public void testTaskSubmission() throws Exception {
        Configuration configuration = new Configuration();
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        TaskManagerConfiguration fromConfiguration = TaskManagerConfiguration.fromConfiguration(configuration);
        JobID jobID = new JobID();
        AllocationID allocationID = new AllocationID();
        UUID randomUUID = UUID.randomUUID();
        JobVertexID jobVertexID = new JobVertexID();
        TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(new SerializedValue(new JobInformation(jobID, this.name.getMethodName(), new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList())), new SerializedValue(new TaskInformation(jobVertexID, "test task", 1, 1, TestInvokable.class.getName(), new Configuration())), new ExecutionAttemptID(), allocationID, 0, 0, 0, (TaskStateHandles) null, Collections.emptyList(), Collections.emptyList());
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
        Mockito.when(libraryCacheManager.getClassLoader((JobID) Mockito.eq(jobID))).thenReturn(getClass().getClassLoader());
        JobManagerConnection jobManagerConnection = new JobManagerConnection(jobID, ResourceID.generate(), (JobMasterGateway) Mockito.mock(JobMasterGateway.class), randomUUID, (TaskManagerActions) Mockito.mock(TaskManagerActions.class), (CheckpointResponder) Mockito.mock(CheckpointResponder.class), libraryCacheManager, (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(jobID, jobManagerConnection);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(Boolean.valueOf(taskSlotTable.existsActiveSlot((JobID) Mockito.eq(jobID), (AllocationID) Mockito.eq(allocationID)))).thenReturn(true);
        Mockito.when(Boolean.valueOf(taskSlotTable.addTask((Task) Mockito.any(Task.class)))).thenReturn(true);
        NetworkEnvironment networkEnvironment = (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class);
        Mockito.when(networkEnvironment.createKvStateTaskRegistry((JobID) Mockito.eq(jobID), (JobVertexID) Mockito.eq(jobVertexID))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        TaskManagerMetricGroup taskManagerMetricGroup = (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class);
        TaskMetricGroup taskMetricGroup = (TaskMetricGroup) Mockito.mock(TaskMetricGroup.class);
        Mockito.when(taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
        Mockito.when(taskManagerMetricGroup.addTaskForJob((JobID) Mockito.any(JobID.class), Mockito.anyString(), (JobVertexID) Mockito.any(JobVertexID.class), (ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class), Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt())).thenReturn(taskMetricGroup);
        HighAvailabilityServices highAvailabilityServices = (HighAvailabilityServices) Mockito.mock(HighAvailabilityServices.class);
        Mockito.when(highAvailabilityServices.getResourceManagerLeaderRetriever()).thenReturn(Mockito.mock(LeaderRetrievalService.class));
        try {
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, fromConfiguration, (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class), (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), networkEnvironment, highAvailabilityServices, (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS), (MetricRegistry) Mockito.mock(MetricRegistry.class), taskManagerMetricGroup, (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, (JobLeaderService) Mockito.mock(JobLeaderService.class), testingFatalErrorHandler);
            taskExecutor.start();
            taskExecutor.submitTask(taskDeploymentDescriptor, randomUUID);
            TestInvokable.completableFuture.get();
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    public void testJobLeaderDetection() throws Exception {
        JobID jobID = new JobID();
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        TaskManagerConfiguration fromConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
        ResourceID resourceID = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceID, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class)), (TimerService) Mockito.mock(TimerService.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService(null, null);
        TestingLeaderRetrievalService testingLeaderRetrievalService2 = new TestingLeaderRetrievalService(null, null);
        testingHighAvailabilityServices.setResourceManagerLeaderRetriever(testingLeaderRetrievalService);
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, testingLeaderRetrievalService2);
        UUID randomUUID = UUID.randomUUID();
        ResourceID resourceID2 = new ResourceID("rm");
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        Mockito.when(resourceManagerGateway.registerTaskExecutor((UUID) Mockito.eq(randomUUID), (String) Mockito.any(String.class), (ResourceID) Mockito.eq(resourceID), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID2, 1000L)));
        UUID randomUUID2 = UUID.randomUUID();
        ResourceID resourceID3 = new ResourceID("jm");
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.registerTaskManager((String) Mockito.any(String.class), (TaskManagerLocation) Mockito.eq(taskManagerLocation), (UUID) Mockito.eq(randomUUID2), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(resourceID3, 42)));
        Mockito.when(jobMasterGateway.getHostname()).thenReturn("jm");
        Mockito.when(jobMasterGateway.offerSlots((ResourceID) Mockito.any(ResourceID.class), (Iterable) Mockito.any(Iterable.class), (UUID) Mockito.any(UUID.class), (Time) Mockito.any(Time.class))).thenReturn(Mockito.mock(Future.class, Mockito.RETURNS_MOCKS));
        testingSerialRpcService.registerGateway("rm", resourceManagerGateway);
        testingSerialRpcService.registerGateway("jm", jobMasterGateway);
        AllocationID allocationID = new AllocationID();
        SlotID slotID = new SlotID(resourceID, 0);
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        try {
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, fromConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), testingHighAvailabilityServices, (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS), (MetricRegistry) Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, testingFatalErrorHandler);
            taskExecutor.start();
            testingLeaderRetrievalService.notifyListener("rm", randomUUID);
            taskExecutor.requestSlot(slotID, jobID, allocationID, "jm", randomUUID);
            testingLeaderRetrievalService2.notifyListener("jm", randomUUID2);
            ((JobMasterGateway) Mockito.verify(jobMasterGateway)).offerSlots((ResourceID) Mockito.any(ResourceID.class), (Iterable) Matchers.argThat(org.hamcrest.Matchers.contains(new SlotOffer[]{slotOffer})), (UUID) Mockito.eq(randomUUID2), (Time) Mockito.any(Time.class));
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    public void testSlotAcceptance() throws Exception {
        JobID jobID = new JobID();
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        TaskManagerConfiguration fromConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
        ResourceID resourceID = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceID, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class), (ResourceProfile) Mockito.mock(ResourceProfile.class)), (TimerService) Mockito.mock(TimerService.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        UUID randomUUID = UUID.randomUUID();
        ResourceID resourceID2 = new ResourceID("rm");
        UUID randomUUID2 = UUID.randomUUID();
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("rm", randomUUID);
        TestingLeaderRetrievalService testingLeaderRetrievalService2 = new TestingLeaderRetrievalService("jm", randomUUID2);
        testingHighAvailabilityServices.setResourceManagerLeaderRetriever(testingLeaderRetrievalService);
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, testingLeaderRetrievalService2);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        InstanceID instanceID = new InstanceID();
        Mockito.when(resourceManagerGateway.registerTaskExecutor((UUID) Mockito.eq(randomUUID), (String) Mockito.any(String.class), (ResourceID) Mockito.eq(resourceID), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(instanceID, resourceID2, 1000L)));
        ResourceID resourceID3 = new ResourceID("jm");
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.registerTaskManager((String) Mockito.any(String.class), (TaskManagerLocation) Mockito.eq(taskManagerLocation), (UUID) Mockito.eq(randomUUID2), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(resourceID3, 42)));
        Mockito.when(jobMasterGateway.getHostname()).thenReturn("jm");
        Mockito.when(jobMasterGateway.offerSlots((ResourceID) Mockito.any(ResourceID.class), (Iterable) Mockito.any(Iterable.class), (UUID) Mockito.eq(randomUUID2), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(Collections.singleton(slotOffer)));
        testingSerialRpcService.registerGateway("rm", resourceManagerGateway);
        testingSerialRpcService.registerGateway("jm", jobMasterGateway);
        try {
            new TaskExecutor(testingSerialRpcService, fromConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), testingHighAvailabilityServices, (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS), (MetricRegistry) Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, testingFatalErrorHandler).start();
            taskSlotTable.allocateSlot(0, jobID, allocationID, Time.milliseconds(10000L));
            taskSlotTable.allocateSlot(1, jobID, allocationID2, Time.milliseconds(10000L));
            jobLeaderService.addJob(jobID, "jm");
            ((ResourceManagerGateway) Mockito.verify(resourceManagerGateway)).notifySlotAvailable((UUID) Mockito.eq(randomUUID), (InstanceID) Mockito.eq(instanceID), (SlotID) Mockito.eq(new SlotID(resourceID, 1)), (AllocationID) Mockito.eq(allocationID2));
            Assert.assertTrue(taskSlotTable.existsActiveSlot(jobID, allocationID));
            Assert.assertFalse(taskSlotTable.existsActiveSlot(jobID, allocationID2));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    @Ignore
    public void testRejectAllocationRequestsForOutOfSyncSlots() throws Exception {
        ResourceID generate = ResourceID.generate();
        UUID randomUUID = UUID.randomUUID();
        JobID jobID = new JobID();
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        try {
            ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
            testingSerialRpcService.registerGateway("/resource/manager/address/one", resourceManagerGateway);
            TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("localhost", HighAvailabilityServices.DEFAULT_LEADER_ID);
            TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
            testingHighAvailabilityServices.setResourceManagerLeaderRetriever(testingLeaderRetrievalService);
            TaskManagerConfiguration taskManagerConfiguration = (TaskManagerConfiguration) Mockito.mock(TaskManagerConfiguration.class);
            Mockito.when(Integer.valueOf(taskManagerConfiguration.getNumberSlots())).thenReturn(1);
            TaskManagerLocation taskManagerLocation = (TaskManagerLocation) Mockito.mock(TaskManagerLocation.class);
            Mockito.when(taskManagerLocation.getResourceID()).thenReturn(generate);
            TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, taskManagerConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), testingHighAvailabilityServices, (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS), (MetricRegistry) Mockito.mock(MetricRegistry.class), (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class), (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), (TaskSlotTable) Mockito.mock(TaskSlotTable.class), (JobManagerTable) Mockito.mock(JobManagerTable.class), (JobLeaderService) Mockito.mock(JobLeaderService.class), testingFatalErrorHandler);
            taskExecutor.start();
            String address = taskExecutor.getAddress();
            Assert.assertNull(taskExecutor.getResourceManagerConnection());
            testingLeaderRetrievalService.notifyListener("/resource/manager/address/one", randomUUID);
            ((ResourceManagerGateway) Mockito.verify(resourceManagerGateway)).registerTaskExecutor((UUID) Mockito.eq(randomUUID), (String) Mockito.eq(address), (ResourceID) Mockito.eq(generate), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class));
            Assert.assertNotNull(taskExecutor.getResourceManagerConnection());
            taskExecutor.requestSlot(new SlotID(generate, 0), jobID, new AllocationID(), "foobar", randomUUID);
            SlotID slotID = new SlotID(generate, 1);
            try {
                taskExecutor.requestSlot(slotID, jobID, new AllocationID(), "foobar", randomUUID);
                Assert.fail("The slot request should have failed.");
            } catch (SlotAllocationException e) {
            }
            ((ResourceManagerGateway) Mockito.verify(resourceManagerGateway)).registerTaskExecutor((UUID) Mockito.eq(randomUUID), (String) Mockito.eq(address), (ResourceID) Mockito.eq(generate), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class));
            testingLeaderRetrievalService.notifyListener("/resource/manager/address/one", randomUUID);
            taskExecutor.requestSlot(slotID, jobID, new AllocationID(), "foobar", randomUUID);
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }

    @Test
    public void testSubmitTaskBeforeAcceptSlot() throws Exception {
        JobID jobID = new JobID();
        TestingSerialRpcService testingSerialRpcService = new TestingSerialRpcService();
        TaskManagerConfiguration fromConfiguration = TaskManagerConfiguration.fromConfiguration(new Configuration());
        ResourceID resourceID = new ResourceID("foobar");
        TaskManagerLocation taskManagerLocation = new TaskManagerLocation(resourceID, InetAddress.getLoopbackAddress(), 1234);
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class), (ResourceProfile) Mockito.mock(ResourceProfile.class)), (TimerService) Mockito.mock(TimerService.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(taskManagerLocation);
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        UUID randomUUID = UUID.randomUUID();
        ResourceID resourceID2 = new ResourceID("rm");
        UUID randomUUID2 = UUID.randomUUID();
        TestingLeaderRetrievalService testingLeaderRetrievalService = new TestingLeaderRetrievalService("rm", randomUUID);
        TestingLeaderRetrievalService testingLeaderRetrievalService2 = new TestingLeaderRetrievalService("jm", randomUUID2);
        testingHighAvailabilityServices.setResourceManagerLeaderRetriever(testingLeaderRetrievalService);
        testingHighAvailabilityServices.setJobMasterLeaderRetriever(jobID, testingLeaderRetrievalService2);
        ResourceManagerGateway resourceManagerGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        InstanceID instanceID = new InstanceID();
        Mockito.when(resourceManagerGateway.registerTaskExecutor((UUID) Mockito.eq(randomUUID), (String) Mockito.any(String.class), (ResourceID) Mockito.eq(resourceID), (SlotReport) Mockito.any(SlotReport.class), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new TaskExecutorRegistrationSuccess(instanceID, resourceID2, 1000L)));
        ResourceID resourceID3 = new ResourceID("jm");
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.registerTaskManager((String) Mockito.any(String.class), (TaskManagerLocation) Mockito.eq(taskManagerLocation), (UUID) Mockito.eq(randomUUID2), (Time) Mockito.any(Time.class))).thenReturn(FlinkCompletableFuture.completed(new JMTMRegistrationSuccess(resourceID3, 42)));
        Mockito.when(jobMasterGateway.getHostname()).thenReturn("jm");
        testingSerialRpcService.registerGateway("rm", resourceManagerGateway);
        testingSerialRpcService.registerGateway("jm", jobMasterGateway);
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
        Mockito.when(libraryCacheManager.getClassLoader((JobID) Mockito.eq(jobID))).thenReturn(getClass().getClassLoader());
        jobManagerTable.put(jobID, new JobManagerConnection(jobID, resourceID3, jobMasterGateway, randomUUID2, (TaskManagerActions) Mockito.mock(TaskManagerActions.class), (CheckpointResponder) Mockito.mock(CheckpointResponder.class), libraryCacheManager, (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class)));
        try {
            TaskManagerMetricGroup taskManagerMetricGroup = (TaskManagerMetricGroup) Mockito.mock(TaskManagerMetricGroup.class);
            TaskMetricGroup taskMetricGroup = (TaskMetricGroup) Mockito.mock(TaskMetricGroup.class);
            Mockito.when(taskMetricGroup.getIOMetricGroup()).thenReturn(Mockito.mock(TaskIOMetricGroup.class));
            Mockito.when(taskManagerMetricGroup.addTaskForJob((JobID) Mockito.any(JobID.class), Mockito.anyString(), (JobVertexID) Mockito.any(JobVertexID.class), (ExecutionAttemptID) Mockito.any(ExecutionAttemptID.class), Mockito.anyString(), Mockito.anyInt(), Mockito.anyInt())).thenReturn(taskMetricGroup);
            TaskExecutor taskExecutor = new TaskExecutor(testingSerialRpcService, fromConfiguration, taskManagerLocation, (MemoryManager) Mockito.mock(MemoryManager.class), (IOManager) Mockito.mock(IOManager.class), (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class), testingHighAvailabilityServices, (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS), (MetricRegistry) Mockito.mock(MetricRegistry.class), taskManagerMetricGroup, (BroadcastVariableManager) Mockito.mock(BroadcastVariableManager.class), (FileCache) Mockito.mock(FileCache.class), taskSlotTable, jobManagerTable, jobLeaderService, testingFatalErrorHandler);
            taskExecutor.start();
            taskSlotTable.allocateSlot(0, jobID, allocationID, Time.milliseconds(10000L));
            taskSlotTable.allocateSlot(1, jobID, allocationID2, Time.milliseconds(10000L));
            TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(new SerializedValue(new JobInformation(jobID, this.name.getMethodName(), new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList())), new SerializedValue(new TaskInformation(new JobVertexID(), "test task", 1, 1, TestInvokable.class.getName(), new Configuration())), new ExecutionAttemptID(), allocationID, 0, 0, 0, (TaskStateHandles) null, Collections.emptyList(), Collections.emptyList());
            FlinkCompletableFuture flinkCompletableFuture = new FlinkCompletableFuture();
            Mockito.when(jobMasterGateway.offerSlots((ResourceID) Mockito.any(ResourceID.class), (Iterable) Mockito.any(Iterable.class), (UUID) Mockito.eq(randomUUID2), (Time) Mockito.any(Time.class))).thenReturn(flinkCompletableFuture);
            jobLeaderService.addJob(jobID, "jm");
            ((JobMasterGateway) Mockito.verify(jobMasterGateway)).offerSlots((ResourceID) Mockito.any(ResourceID.class), (Iterable) Mockito.any(Iterable.class), (UUID) Mockito.eq(randomUUID2), (Time) Mockito.any(Time.class));
            taskExecutor.submitTask(taskDeploymentDescriptor, randomUUID2);
            flinkCompletableFuture.complete(Collections.singleton(slotOffer));
            ((ResourceManagerGateway) Mockito.verify(resourceManagerGateway)).notifySlotAvailable((UUID) Mockito.eq(randomUUID), (InstanceID) Mockito.eq(instanceID), (SlotID) Mockito.eq(new SlotID(resourceID, 1)), (AllocationID) Mockito.any(AllocationID.class));
            Assert.assertTrue(taskSlotTable.existsActiveSlot(jobID, allocationID));
            Assert.assertFalse(taskSlotTable.existsActiveSlot(jobID, allocationID2));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            testingFatalErrorHandler.rethrowError();
            testingSerialRpcService.stopService();
        } catch (Throwable th) {
            testingSerialRpcService.stopService();
            throw th;
        }
    }
}
